diff --git hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSource.java hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSource.java index c1629f7..4ecd73b 100644 --- hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSource.java +++ hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSource.java @@ -46,6 +46,10 @@ public interface MetricsRESTSource extends BaseSource { String FAILED_PUT_KEY = "failedPut"; String FAILED_DELETE_KEY = "failedDelete"; + + String SUCCESSFUL_SCAN_KEY = "successfulScanCount"; + + String FAILED_SCAN_KEY = "failedScanCount"; /** * Increment the number of requests @@ -95,4 +99,18 @@ public interface MetricsRESTSource extends BaseSource { * @param inc The number of failed delete requests. */ void incrementFailedDeleteRequests(int inc); + + /** + * Increment the number of successful scan requests. + * + * @param inc Number of successful scan requests. + */ + void incrementSucessfulScanRequests(final int inc); + + /** + * Increment the number failed scan requests. + * + * @param inc the inc + */ + void incrementFailedScanRequests(final int inc); } diff --git hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSourceImpl.java hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSourceImpl.java index 94551a6..bd9cc46 100644 --- hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSourceImpl.java +++ hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSourceImpl.java @@ -33,9 +33,11 @@ public class MetricsRESTSourceImpl extends BaseSourceImpl implements MetricsREST private MetricMutableCounterLong sucGet; private MetricMutableCounterLong sucPut; private MetricMutableCounterLong sucDel; + private MetricMutableCounterLong sucScan; private MetricMutableCounterLong fGet; private MetricMutableCounterLong fPut; private MetricMutableCounterLong fDel; + private MetricMutableCounterLong fScan; public MetricsRESTSourceImpl() { this(METRICS_NAME, METRICS_DESCRIPTION, CONTEXT, JMX_CONTEXT); @@ -56,10 +58,12 @@ public class MetricsRESTSourceImpl extends BaseSourceImpl implements MetricsREST sucGet = getMetricsRegistry().getLongCounter(SUCCESSFUL_GET_KEY, 0l); sucPut = getMetricsRegistry().getLongCounter(SUCCESSFUL_PUT_KEY, 0l); sucDel = getMetricsRegistry().getLongCounter(SUCCESSFUL_DELETE_KEY, 0l); + sucScan = getMetricsRegistry().getLongCounter(SUCCESSFUL_SCAN_KEY, 0L); fGet = getMetricsRegistry().getLongCounter(FAILED_GET_KEY, 0l); fPut = getMetricsRegistry().getLongCounter(FAILED_PUT_KEY, 0l); fDel = getMetricsRegistry().getLongCounter(FAILED_DELETE_KEY, 0l); + fScan = getMetricsRegistry().getLongCounter(FAILED_SCAN_KEY, 0l);; } @Override @@ -96,4 +100,14 @@ public class MetricsRESTSourceImpl extends BaseSourceImpl implements MetricsREST public void incrementFailedDeleteRequests(int inc) { fDel.incr(inc); } + + @Override + public void incrementSucessfulScanRequests(int inc) { + sucScan.incr(inc); + } + + @Override + public void incrementFailedScanRequests(int inc) { + fScan.incr(inc); + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/rest/Constants.java hbase-server/src/main/java/org/apache/hadoop/hbase/rest/Constants.java index 6abdc6c..bb9237d 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/rest/Constants.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/rest/Constants.java @@ -57,4 +57,14 @@ public interface Constants { static final String REST_DNS_NAMESERVER = "hbase.rest.dns.nameserver"; static final String REST_DNS_INTERFACE = "hbase.rest.dns.interface"; + + public static final String SCAN_START_ROW = "startrow"; + public static final String SCAN_END_ROW = "endrow"; + public static final String SCAN_COLUMN = "column"; + public static final String SCAN_START_TIME = "starttime"; + public static final String SCAN_END_TIME = "endtime"; + public static final String SCAN_MAX_VERSIONS = "maxversions"; + public static final String SCAN_BATCH_SIZE = "batchsize"; + public static final String SCAN_LIMIT = "limit"; + public static final String SCAN_FETCH_SIZE = "hbase.rest.scan.fetchsize"; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/rest/MetricsREST.java hbase-server/src/main/java/org/apache/hadoop/hbase/rest/MetricsREST.java index 98f48a7..82ccfa5 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/rest/MetricsREST.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/rest/MetricsREST.java @@ -78,12 +78,26 @@ public class MetricsREST { public void incrementSucessfulDeleteRequests(final int inc) { source.incrementSucessfulDeleteRequests(inc); } - + /** * @param inc How much to add to failedDeleteCount. */ public void incrementFailedDeleteRequests(final int inc) { source.incrementFailedDeleteRequests(inc); } - + + /** + * @param inc How much to add to sucessfulScanCount. + */ + public synchronized void incrementSucessfulScanRequests(final int inc) { + source.incrementSucessfulScanRequests(inc); + } + + /** + * @param inc How much to add to failedScanCount. + */ + public void incrementFailedScanRequests(final int inc) { + source.incrementFailedScanRequests(inc); + } + } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/rest/ProtobufStreamingUtil.java hbase-server/src/main/java/org/apache/hadoop/hbase/rest/ProtobufStreamingUtil.java new file mode 100644 index 0000000..8026ac5 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/rest/ProtobufStreamingUtil.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.rest; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.List; + +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.StreamingOutput; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.rest.model.CellModel; +import org.apache.hadoop.hbase.rest.model.CellSetModel; +import org.apache.hadoop.hbase.rest.model.RowModel; +import org.apache.hadoop.hbase.util.Bytes; + + +public class ProtobufStreamingUtil implements StreamingOutput { + + private static final Log LOG = LogFactory.getLog(ProtobufStreamingUtil.class); + private String contentType; + private ResultScanner resultScanner; + private int limit; + private int fetchSize; + + protected ProtobufStreamingUtil(ResultScanner scanner, String type, int limit, int fetchSize) { + this.resultScanner = scanner; + this.contentType = type; + this.limit = limit; + this.fetchSize = fetchSize; + LOG.debug("Created ScanStreamingUtil with content type = " + this.contentType + " user limit : " + + this.limit + " scan fetch size : " + this.fetchSize); + } + + @Override + public void write(OutputStream outStream) throws IOException, WebApplicationException { + Result[] rowsToSend; + if(limit < fetchSize){ + rowsToSend = this.resultScanner.next(limit); + writeToStream(createModelFromResults(rowsToSend), this.contentType, outStream); + } else { + int count = limit; + while (count > 0) { + if (count < fetchSize) { + rowsToSend = this.resultScanner.next(count); + } else { + rowsToSend = this.resultScanner.next(this.fetchSize); + } + if(rowsToSend.length == 0){ + break; + } + count = count - rowsToSend.length; + writeToStream(createModelFromResults(rowsToSend), this.contentType, outStream); + } + } + } + + private void writeToStream(CellSetModel model, String contentType, OutputStream outStream) + throws IOException { + byte[] objectBytes = model.createProtobufOutput(); + outStream.write(Bytes.toBytes((short)objectBytes.length)); + outStream.write(objectBytes); + outStream.flush(); + LOG.trace("Wrote " + model.getRows().size() + " rows to stream successfully."); + } + + private CellSetModel createModelFromResults(Result[] results) { + CellSetModel cellSetModel = new CellSetModel(); + for (Result rs : results) { + byte[] rowKey = rs.getRow(); + RowModel rModel = new RowModel(rowKey); + List kvs = rs.list(); + for (Cell kv : kvs) { + rModel.addCell(new CellModel(kv.getFamily(), kv.getQualifier(), kv.getTimestamp(), kv + .getValue())); + } + cellSetModel.addRow(rModel); + } + return cellSetModel; + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/rest/ResourceBase.java hbase-server/src/main/java/org/apache/hadoop/hbase/rest/ResourceBase.java index c81b1eb..419640b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/rest/ResourceBase.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/rest/ResourceBase.java @@ -21,14 +21,74 @@ package org.apache.hadoop.hbase.rest; import java.io.IOException; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Response; + import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; +import org.apache.hadoop.util.StringUtils; @InterfaceAudience.Private public class ResourceBase implements Constants { RESTServlet servlet; + Class accessDeniedClazz; public ResourceBase() throws IOException { servlet = RESTServlet.getInstance(); + try { + accessDeniedClazz = Class.forName("org.apache.hadoop.hbase.security.AccessDeniedException"); + } catch (ClassNotFoundException e) { + } + } + + protected Response processException(Throwable exp) { + Throwable curr = exp; + if(accessDeniedClazz != null) { + //some access denied exceptions are buried + while (curr != null) { + if(accessDeniedClazz.isAssignableFrom(curr.getClass())) { + throw new SecurityException("Unauthorized" + CRLF + + StringUtils.stringifyException(exp) + CRLF); + } + curr = curr.getCause(); + } + } + //TableNotFound may also be buried one level deep + if (exp instanceof TableNotFoundException || + exp.getCause() instanceof TableNotFoundException) { + throw new WebApplicationException( + Response.status(Response.Status.NOT_FOUND) + .type(MIMETYPE_TEXT).entity("Not found" + CRLF + + StringUtils.stringifyException(exp) + CRLF) + .build()); + } + if (exp instanceof NoSuchColumnFamilyException){ + throw new WebApplicationException( + Response.status(Response.Status.NOT_FOUND) + .type(MIMETYPE_TEXT).entity("Not found" + CRLF + + StringUtils.stringifyException(exp) + CRLF) + .build()); + } + if (exp instanceof RuntimeException) { + throw new WebApplicationException( + Response.status(Response.Status.BAD_REQUEST) + .type(MIMETYPE_TEXT).entity("Bad request" + CRLF + + StringUtils.stringifyException(exp) + CRLF) + .build()); + } + if (exp instanceof RetriesExhaustedWithDetailsException) { + RetriesExhaustedWithDetailsException retryException = + (RetriesExhaustedWithDetailsException) exp; + processException(retryException.getCause(0)); + } + throw new WebApplicationException( + Response.status(Response.Status.SERVICE_UNAVAILABLE) + .type(MIMETYPE_TEXT).entity("Unavailable" + CRLF + + StringUtils.stringifyException(exp) + CRLF) + .build()); } } + diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/rest/TableResource.java hbase-server/src/main/java/org/apache/hadoop/hbase/rest/TableResource.java index 5172fa3..4a32b9f 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/rest/TableResource.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/rest/TableResource.java @@ -20,18 +20,46 @@ package org.apache.hadoop.hbase.rest; import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import javax.ws.rs.DefaultValue; import javax.ws.rs.Encoded; +import javax.ws.rs.GET; +import javax.ws.rs.HeaderParam; import javax.ws.rs.Path; import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.ResponseBuilder; +import javax.ws.rs.core.UriInfo; +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.rest.model.CellModel; +import org.apache.hadoop.hbase.rest.model.RowModel; +import org.apache.hadoop.hbase.util.Bytes; +import org.codehaus.jackson.annotate.JsonIgnore; +import org.codehaus.jackson.annotate.JsonProperty; @InterfaceAudience.Private public class TableResource extends ResourceBase { String table; + private static final Log LOG = LogFactory.getLog(TableResource.class); /** * Constructor @@ -91,4 +119,163 @@ public class TableResource extends ResourceBase { final @QueryParam("check") String check) throws IOException { return new RowResource(this, rowspec, versions, check); } + + @GET + @Produces({ Constants.MIMETYPE_XML, Constants.MIMETYPE_JSON }) + public CellSetModelStream get( + final @Context UriInfo uriInfo, + final @PathParam("table") String tableName, + final @HeaderParam("Accept") String contentType, + @DefaultValue(Integer.MAX_VALUE + "") @QueryParam(Constants.SCAN_LIMIT) int userRequestedLimit, + @DefaultValue("") @QueryParam(Constants.SCAN_START_ROW) String startRow, + @DefaultValue("") @QueryParam(Constants.SCAN_END_ROW) String endRow, + @DefaultValue("") @QueryParam(Constants.SCAN_COLUMN) List column, + @DefaultValue("1") @QueryParam(Constants.SCAN_MAX_VERSIONS) int maxVersions, + @DefaultValue("-1") @QueryParam(Constants.SCAN_BATCH_SIZE) int batchSize, + @DefaultValue("0") @QueryParam(Constants.SCAN_START_TIME) long startTime, + @DefaultValue(Long.MAX_VALUE + "") @QueryParam(Constants.SCAN_END_TIME) long endTime, + @DefaultValue("true") @QueryParam(Constants.SCAN_BATCH_SIZE) boolean cacheBlocks) { + servlet.getMetrics().incrementRequests(1); + try { + final Iterator itr = getScanner(tableName, Bytes.toBytes(startRow), + Bytes.toBytes(endRow), column, startTime, endTime, cacheBlocks, maxVersions, batchSize) + .iterator(); + final int rowsToSend = userRequestedLimit; + servlet.getMetrics().incrementSucessfulScanRequests(1); + return new CellSetModelStream(new ArrayList() { + public Iterator iterator() { + return new Iterator() { + int count = rowsToSend; + + @Override + public boolean hasNext() { + if (count > 0) { + return itr.hasNext(); + } else { + return false; + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException( + "Remove method cannot be used in CellSetModelStream"); + } + + @Override + public RowModel next() { + Result rs = itr.next(); + if ((rs == null) || (count <= 0)) { + return null; + } + byte[] rowKey = rs.getRow(); + RowModel rModel = new RowModel(rowKey); + List kvs = rs.list(); + for (Cell kv : kvs) { + rModel.addCell(new CellModel(kv.getFamily(), kv.getQualifier(), kv.getTimestamp(), + kv.getValue())); + } + count--; + return rModel; + } + }; + } + }); + } catch (Exception exp) { + servlet.getMetrics().incrementFailedScanRequests(1); + processException(exp); + LOG.warn(exp); + return null; + } + } + + @GET + @Produces({ Constants.MIMETYPE_PROTOBUF, Constants.MIMETYPE_PROTOBUF_IETF }) + public Response getProtobuf( + final @Context UriInfo uriInfo, + final @PathParam("table") String tableName, + final @HeaderParam("Accept") String contentType, + @DefaultValue(Integer.MAX_VALUE + "") @QueryParam(Constants.SCAN_LIMIT) int userRequestedLimit, + @DefaultValue("") @QueryParam(Constants.SCAN_START_ROW) String startRow, + @DefaultValue("") @QueryParam(Constants.SCAN_END_ROW) String endRow, + @DefaultValue("column") @QueryParam(Constants.SCAN_COLUMN) List column, + @DefaultValue("1") @QueryParam(Constants.SCAN_MAX_VERSIONS) int maxVersions, + @DefaultValue("-1") @QueryParam(Constants.SCAN_BATCH_SIZE) int batchSize, + @DefaultValue("0") @QueryParam(Constants.SCAN_START_TIME) long startTime, + @DefaultValue(Long.MAX_VALUE + "") @QueryParam(Constants.SCAN_END_TIME) long endTime, + @DefaultValue("true") @QueryParam(Constants.SCAN_BATCH_SIZE) boolean cacheBlocks) { + servlet.getMetrics().incrementRequests(1); + try { + int fetchSize = this.servlet.getConfiguration().getInt(Constants.SCAN_FETCH_SIZE, 10); + ProtobufStreamingUtil stream = new ProtobufStreamingUtil(getScanner(tableName, + Bytes.toBytes(startRow), Bytes.toBytes(endRow), column, startTime, endTime, cacheBlocks, + maxVersions, batchSize), contentType, userRequestedLimit, fetchSize); + servlet.getMetrics().incrementSucessfulScanRequests(1); + ResponseBuilder response = Response.ok(stream); + response.header("content-type", contentType); + return response.build(); + } catch (Exception exp) { + servlet.getMetrics().incrementFailedScanRequests(1); + processException(exp); + LOG.warn(exp); + return null; + } + } + + private ResultScanner getScanner(String tableName, byte[] startRow, byte[] endRow, + List columns, long startTime, long endTime, boolean cacheBlocks, int maxVersions, + int batchSize) throws IOException { + LOG.debug("Query parameters : Table Name = > " + tableName + " Start Row => " + + Bytes.toString(startRow) + " End Row => " + Bytes.toString(endRow) + " Columns => " + + columns + " Start Time => " + startTime + " End Time => " + endTime + " Cache Blocks => " + + cacheBlocks + " Max Versions => " + maxVersions + " Batch Size => " + batchSize); + HTableInterface table = RESTServlet.getInstance().getTable(tableName); + Scan tableScan = new Scan(); + tableScan.setBatch(batchSize); + tableScan.setMaxVersions(maxVersions); + tableScan.setTimeRange(startTime, endTime); + tableScan.setStartRow(startRow); + tableScan.setStopRow(endRow); + for (String csplit : columns) { + String[] familysplit = csplit.trim().split(":"); + if (familysplit.length == 2) { + if (familysplit[1].length() > 0) { + LOG.debug("Scan family and column : " + familysplit[0] + " " + familysplit[1]); + tableScan.addColumn(Bytes.toBytes(familysplit[0]), Bytes.toBytes(familysplit[1])); + } else { + tableScan.addFamily(Bytes.toBytes(familysplit[0])); + LOG.debug("Scan family : " + familysplit[0] + " and empty qualifier."); + tableScan.addColumn(Bytes.toBytes(familysplit[0]), null); + } + } else { + LOG.debug("Scan family : " + familysplit[0]); + tableScan.addFamily(Bytes.toBytes(familysplit[0])); + } + } + int fetchSize = this.servlet.getConfiguration().getInt(Constants.SCAN_FETCH_SIZE, 10); + tableScan.setCaching(fetchSize); + return table.getScanner(tableScan); + } + + @XmlRootElement(name = "CellSet") + @XmlAccessorType(XmlAccessType.FIELD) + public static class CellSetModelStream { + //JAXB needs an arraylist for streaming + @XmlElement(name="Row") + @JsonIgnore + private ArrayList Row; + + public CellSetModelStream() { + } + + public CellSetModelStream(final ArrayList rowList) { + this.Row = rowList; + } + + //jackson needs an iterator for streaming + @JsonProperty("Row") + public Iterator getIterator() { + return Row.iterator(); + } + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/Client.java hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/Client.java index 3cb6fcd..77feb24 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/Client.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/Client.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.rest.client; import java.io.IOException; +import java.io.InputStream; import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -333,7 +334,8 @@ public class Client { int code = execute(c, method, headers, path); headers = method.getResponseHeaders(); byte[] body = method.getResponseBody(); - return new Response(code, headers, body); + InputStream in = method.getResponseBodyAsStream(); + return new Response(code, headers, body, in); } finally { method.releaseConnection(); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/Response.java hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/Response.java index 16ee9d2..c2810fe 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/Response.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/Response.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.rest.client; +import java.io.InputStream; + import org.apache.commons.httpclient.Header; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -32,6 +34,7 @@ public class Response { private int code; private Header[] headers; private byte[] body; + private InputStream stream; /** * Constructor @@ -61,6 +64,20 @@ public class Response { this.headers = headers; this.body = body; } + + /** + * Constructor + * @param code the HTTP response code + * @param headers headers the HTTP response headers + * @param body the response body, can be null + * @param in Inputstream if the response had one. + */ + public Response(int code, Header[] headers, byte[] body, InputStream in) { + this.code = code; + this.headers = headers; + this.body = body; + this.stream = in; + } /** * @return the HTTP response code @@ -68,6 +85,15 @@ public class Response { public int getCode() { return code; } + + /** + * Gets the input stream instance. + * + * @return an instance of InputStream class. + */ + public InputStream getStream(){ + return this.stream; + } /** * @return the HTTP response headers diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestGzipFilter.java hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestGzipFilter.java index ea0ad9b..3216ed0 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestGzipFilter.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestGzipFilter.java @@ -129,7 +129,7 @@ public class TestGzipFilter { String contentEncoding = response.getHeader("Content-Encoding"); assertTrue(contentEncoding == null || !contentEncoding.contains("gzip")); response = client.get("/" + TABLE, headers); - assertEquals(response.getCode(), 405); + assertEquals(response.getCode(), 406); contentEncoding = response.getHeader("Content-Encoding"); assertTrue(contentEncoding == null || !contentEncoding.contains("gzip")); } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestScannerResource.java hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestScannerResource.java index e932a80..7c1feef 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestScannerResource.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestScannerResource.java @@ -73,7 +73,7 @@ public class TestScannerResource { private static int expectedRows2; private static Configuration conf; - private static int insertData(String tableName, String column, double prob) + static int insertData(Configuration conf, String tableName, String column, double prob) throws IOException { Random rng = new Random(); int count = 0; @@ -100,7 +100,7 @@ public class TestScannerResource { return count; } - private static int countCellSet(CellSetModel model) { + static int countCellSet(CellSetModel model) { int count = 0; Iterator rows = model.getRows().iterator(); while (rows.hasNext()) { @@ -170,8 +170,8 @@ public class TestScannerResource { htd.addFamily(new HColumnDescriptor(CFA)); htd.addFamily(new HColumnDescriptor(CFB)); admin.createTable(htd); - expectedRows1 = insertData(TABLE, COLUMN_1, 1.0); - expectedRows2 = insertData(TABLE, COLUMN_2, 0.5); + expectedRows1 = insertData(TEST_UTIL.getConfiguration(), TABLE, COLUMN_1, 1.0); + expectedRows2 = insertData(TEST_UTIL.getConfiguration(), TABLE, COLUMN_2, 0.5); } @AfterClass diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestTableScan.java hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestTableScan.java new file mode 100644 index 0000000..2455768 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestTableScan.java @@ -0,0 +1,496 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.rest; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import javax.ws.rs.core.MediaType; +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Unmarshaller; +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; +import javax.xml.parsers.SAXParserFactory; +import javax.xml.stream.XMLStreamException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.rest.client.Client; +import org.apache.hadoop.hbase.rest.client.Cluster; +import org.apache.hadoop.hbase.rest.client.Response; +import org.apache.hadoop.hbase.rest.model.CellModel; +import org.apache.hadoop.hbase.rest.model.CellSetModel; +import org.apache.hadoop.hbase.rest.model.RowModel; +import org.apache.hadoop.hbase.rest.provider.JacksonProvider; +import org.apache.hadoop.hbase.util.Bytes; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.JsonToken; +import org.codehaus.jackson.map.ObjectMapper; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.xml.sax.InputSource; +import org.xml.sax.XMLReader; + +@Category(MediumTests.class) +public class TestTableScan { + + private static final String TABLE = "TestScanResource"; + private static final String CFA = "a"; + private static final String CFB = "b"; + private static final String COLUMN_1 = CFA + ":1"; + private static final String COLUMN_2 = CFB + ":2"; + private static Client client; + private static int expectedRows1; + private static int expectedRows2; + private static Configuration conf; + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final HBaseRESTTestingUtility REST_TEST_UTIL = + new HBaseRESTTestingUtility(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf = TEST_UTIL.getConfiguration(); + TEST_UTIL.startMiniCluster(); + REST_TEST_UTIL.startServletContainer(conf); + client = new Client(new Cluster().add("localhost", + REST_TEST_UTIL.getServletPort())); + HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); + if (!admin.tableExists(TABLE)) { + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE)); + htd.addFamily(new HColumnDescriptor(CFA)); + htd.addFamily(new HColumnDescriptor(CFB)); + admin.createTable(htd); + expectedRows1 = TestScannerResource.insertData(TEST_UTIL.getConfiguration(), TABLE, COLUMN_1, + 1.0); + expectedRows2 = TestScannerResource.insertData(TEST_UTIL.getConfiguration(), TABLE, COLUMN_2, + 0.5); + } + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.getHBaseAdmin().disableTable(TABLE); + TEST_UTIL.getHBaseAdmin().deleteTable(TABLE); + REST_TEST_UTIL.shutdownServletContainer(); + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testSimpleScannerXML() throws IOException, JAXBException, XMLStreamException { + // Test scanning particular columns + StringBuilder builder = new StringBuilder(); + builder.append("?"); + builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1); + builder.append("&"); + builder.append(Constants.SCAN_LIMIT + "=10"); + Response response = client.get("/" + TABLE + builder.toString(), + Constants.MIMETYPE_XML); + assertEquals(200, response.getCode()); + assertEquals(Constants.MIMETYPE_XML, response.getHeader("content-type")); + JAXBContext ctx = JAXBContext.newInstance(CellSetModel.class); + Unmarshaller ush = ctx.createUnmarshaller(); + CellSetModel model = (CellSetModel) ush.unmarshal(response.getStream()); + int count = TestScannerResource.countCellSet(model); + assertEquals(10, count); + checkRowsNotNull(model); + + //Test with no limit. + builder = new StringBuilder(); + builder.append("?"); + builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1); + response = client.get("/" + TABLE + builder.toString(), + Constants.MIMETYPE_XML); + assertEquals(200, response.getCode()); + assertEquals(Constants.MIMETYPE_XML, response.getHeader("content-type")); + model = (CellSetModel) ush.unmarshal(response.getStream()); + count = TestScannerResource.countCellSet(model); + assertEquals(expectedRows1, count); + checkRowsNotNull(model); + + //Test with start and end row. + builder = new StringBuilder(); + builder.append("?"); + builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1); + builder.append("&"); + builder.append(Constants.SCAN_START_ROW + "=aaa"); + builder.append("&"); + builder.append(Constants.SCAN_END_ROW + "=aay"); + response = client.get("/" + TABLE + builder.toString(), + Constants.MIMETYPE_XML); + assertEquals(200, response.getCode()); + model = (CellSetModel) ush.unmarshal(response.getStream()); + count = TestScannerResource.countCellSet(model); + RowModel startRow = model.getRows().get(0); + assertEquals("aaa", Bytes.toString(startRow.getKey())); + RowModel endRow = model.getRows().get(model.getRows().size() - 1); + assertEquals("aax", Bytes.toString(endRow.getKey())); + assertEquals(24, count); + checkRowsNotNull(model); + + //Test with start row and limit. + builder = new StringBuilder(); + builder.append("?"); + builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1); + builder.append("&"); + builder.append(Constants.SCAN_START_ROW + "=aaa"); + builder.append("&"); + builder.append(Constants.SCAN_LIMIT + "=15"); + response = client.get("/" + TABLE + builder.toString(), + Constants.MIMETYPE_XML); + assertEquals(200, response.getCode()); + assertEquals(Constants.MIMETYPE_XML, response.getHeader("content-type")); + model = (CellSetModel) ush.unmarshal(response.getStream()); + startRow = model.getRows().get(0); + assertEquals("aaa", Bytes.toString(startRow.getKey())); + count = TestScannerResource.countCellSet(model); + assertEquals(15, count); + checkRowsNotNull(model); + + } + + @Test + public void testSimpleScannerJson() throws IOException, JAXBException { + // Test scanning particular columns with limit. + StringBuilder builder = new StringBuilder(); + builder.append("?"); + builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1); + builder.append("&"); + builder.append(Constants.SCAN_LIMIT + "=20"); + Response response = client.get("/" + TABLE + builder.toString(), + Constants.MIMETYPE_JSON); + assertEquals(200, response.getCode()); + assertEquals(Constants.MIMETYPE_JSON, response.getHeader("content-type")); + ObjectMapper mapper = new JacksonProvider() + .locateMapper(CellSetModel.class, MediaType.APPLICATION_JSON_TYPE); + CellSetModel model = mapper.readValue(response.getStream(), CellSetModel.class); + int count = TestScannerResource.countCellSet(model); + assertEquals(20, count); + checkRowsNotNull(model); + + //Test scanning with no limit. + builder = new StringBuilder(); + builder.append("?"); + builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_2); + response = client.get("/" + TABLE + builder.toString(), + Constants.MIMETYPE_JSON); + assertEquals(200, response.getCode()); + assertEquals(Constants.MIMETYPE_JSON, response.getHeader("content-type")); + model = mapper.readValue(response.getStream(), CellSetModel.class); + count = TestScannerResource.countCellSet(model); + assertEquals(expectedRows2, count); + checkRowsNotNull(model); + + //Test with start row and end row. + builder = new StringBuilder(); + builder.append("?"); + builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1); + builder.append("&"); + builder.append(Constants.SCAN_START_ROW + "=aaa"); + builder.append("&"); + builder.append(Constants.SCAN_END_ROW + "=aay"); + response = client.get("/" + TABLE + builder.toString(), + Constants.MIMETYPE_JSON); + assertEquals(200, response.getCode()); + model = mapper.readValue(response.getStream(), CellSetModel.class); + RowModel startRow = model.getRows().get(0); + assertEquals("aaa", Bytes.toString(startRow.getKey())); + RowModel endRow = model.getRows().get(model.getRows().size() - 1); + assertEquals("aax", Bytes.toString(endRow.getKey())); + count = TestScannerResource.countCellSet(model); + assertEquals(24, count); + checkRowsNotNull(model); + } + + /** + * An example to scan using listener in unmarshaller for XML. + * @throws Exception the exception + */ + @Test + public void testScanUsingListenerUnmarshallerXML() throws Exception { + StringBuilder builder = new StringBuilder(); + builder.append("?"); + builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1); + builder.append("&"); + builder.append(Constants.SCAN_LIMIT + "=10"); + Response response = client.get("/" + TABLE + builder.toString(), + Constants.MIMETYPE_XML); + assertEquals(200, response.getCode()); + assertEquals(Constants.MIMETYPE_XML, response.getHeader("content-type")); + JAXBContext context = JAXBContext.newInstance(ClientSideCellSetModel.class, RowModel.class, + CellModel.class); + Unmarshaller unmarshaller = context.createUnmarshaller(); + + final ClientSideCellSetModel.Listener listener = new ClientSideCellSetModel.Listener() { + @Override + public void handleRowModel(ClientSideCellSetModel helper, RowModel row) { + assertTrue(row.getKey() != null); + assertTrue(row.getCells().size() > 0); + } + }; + + // install the callback on all ClientSideCellSetModel instances + unmarshaller.setListener(new Unmarshaller.Listener() { + public void beforeUnmarshal(Object target, Object parent) { + if (target instanceof ClientSideCellSetModel) { + ((ClientSideCellSetModel) target).setCellSetModelListener(listener); + } + } + + public void afterUnmarshal(Object target, Object parent) { + if (target instanceof ClientSideCellSetModel) { + ((ClientSideCellSetModel) target).setCellSetModelListener(null); + } + } + }); + + // create a new XML parser + SAXParserFactory factory = SAXParserFactory.newInstance(); + factory.setNamespaceAware(true); + XMLReader reader = factory.newSAXParser().getXMLReader(); + reader.setContentHandler(unmarshaller.getUnmarshallerHandler()); + assertFalse(ClientSideCellSetModel.listenerInvoked); + reader.parse(new InputSource(response.getStream())); + assertTrue(ClientSideCellSetModel.listenerInvoked); + + } + + @Test + public void testStreamingJSON() throws Exception { + // Test scanning particular columns with limit. + StringBuilder builder = new StringBuilder(); + builder.append("?"); + builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1); + builder.append("&"); + builder.append(Constants.SCAN_LIMIT + "=20"); + Response response = client.get("/" + TABLE + builder.toString(), + Constants.MIMETYPE_JSON); + assertEquals(200, response.getCode()); + assertEquals(Constants.MIMETYPE_JSON, response.getHeader("content-type")); + ObjectMapper mapper = new JacksonProvider() + .locateMapper(CellSetModel.class, MediaType.APPLICATION_JSON_TYPE); + CellSetModel model = mapper.readValue(response.getStream(), CellSetModel.class); + int count = TestScannerResource.countCellSet(model); + assertEquals(20, count); + checkRowsNotNull(model); + + //Test scanning with no limit. + builder = new StringBuilder(); + builder.append("?"); + builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_2); + response = client.get("/" + TABLE + builder.toString(), + Constants.MIMETYPE_JSON); + assertEquals(200, response.getCode()); + assertEquals(Constants.MIMETYPE_JSON, response.getHeader("content-type")); + model = mapper.readValue(response.getStream(), CellSetModel.class); + count = TestScannerResource.countCellSet(model); + assertEquals(expectedRows2, count); + checkRowsNotNull(model); + + //Test with start row and end row. + builder = new StringBuilder(); + builder.append("?"); + builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1); + builder.append("&"); + builder.append(Constants.SCAN_START_ROW + "=aaa"); + builder.append("&"); + builder.append(Constants.SCAN_END_ROW + "=aay"); + response = client.get("/" + TABLE + builder.toString(), + Constants.MIMETYPE_JSON); + assertEquals(200, response.getCode()); + + count = 0; + JsonFactory jfactory = new JsonFactory(mapper); + JsonParser jParser = jfactory.createJsonParser(response.getStream()); + boolean found = false; + while (jParser.nextToken() != JsonToken.END_OBJECT) { + if(jParser.getCurrentToken() == JsonToken.START_OBJECT && found) { + RowModel row = jParser.readValueAs(RowModel.class); + assertNotNull(row.getKey()); + for (int i = 0; i < row.getCells().size(); i++) { + if (count == 0) { + assertEquals("aaa", Bytes.toString(row.getKey())); + } + if (count == 23) { + assertEquals("aax", Bytes.toString(row.getKey())); + } + count++; + } + jParser.skipChildren(); + } else { + found = jParser.getCurrentToken() == JsonToken.START_ARRAY; + } + } + assertEquals(24, count); + } + + @Test + public void testSimpleScannerProtobuf() throws Exception { + StringBuilder builder = new StringBuilder(); + builder.append("?"); + builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1); + builder.append("&"); + builder.append(Constants.SCAN_LIMIT + "=15"); + Response response = client.get("/" + TABLE + builder.toString(), + Constants.MIMETYPE_PROTOBUF); + assertEquals(200, response.getCode()); + assertEquals(Constants.MIMETYPE_PROTOBUF, response.getHeader("content-type")); + int rowCount = readProtobufStream(response.getStream()); + assertEquals(15, rowCount); + + //Test with start row and end row. + builder = new StringBuilder(); + builder.append("?"); + builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1); + builder.append("&"); + builder.append(Constants.SCAN_START_ROW + "=aaa"); + builder.append("&"); + builder.append(Constants.SCAN_END_ROW + "=aay"); + response = client.get("/" + TABLE + builder.toString(), + Constants.MIMETYPE_PROTOBUF); + assertEquals(200, response.getCode()); + assertEquals(Constants.MIMETYPE_PROTOBUF, response.getHeader("content-type")); + rowCount = readProtobufStream(response.getStream()); + assertEquals(24, rowCount); + } + + private void checkRowsNotNull(CellSetModel model) { + for (RowModel row: model.getRows()) { + assertTrue(row.getKey() != null); + assertTrue(row.getCells().size() > 0); + } + } + + /** + * Read protobuf stream. + * @param inputStream the input stream + * @return The number of rows in the cell set model. + * @throws IOException Signals that an I/O exception has occurred. + */ + public int readProtobufStream(InputStream inputStream) throws IOException{ + DataInputStream stream = new DataInputStream(inputStream); + CellSetModel model = null; + int rowCount = 0; + try { + while (true) { + byte[] lengthBytes = new byte[2]; + int readBytes = stream.read(lengthBytes); + if (readBytes == -1) { + break; + } + assertEquals(2, readBytes); + int length = Bytes.toShort(lengthBytes); + byte[] cellset = new byte[length]; + stream.read(cellset); + model = new CellSetModel(); + model.getObjectFromMessage(cellset); + checkRowsNotNull(model); + rowCount = rowCount + TestScannerResource.countCellSet(model); + } + } catch (EOFException exp) { + exp.printStackTrace(); + } finally { + stream.close(); + } + return rowCount; + } + + @Test + public void testScanningUnknownColumnJson() throws IOException, JAXBException { + // Test scanning particular columns with limit. + StringBuilder builder = new StringBuilder(); + builder.append("?"); + builder.append(Constants.SCAN_COLUMN + "=a:test"); + Response response = client.get("/" + TABLE + builder.toString(), + Constants.MIMETYPE_JSON); + assertEquals(200, response.getCode()); + assertEquals(Constants.MIMETYPE_JSON, response.getHeader("content-type")); + ObjectMapper mapper = new JacksonProvider().locateMapper(CellSetModel.class, + MediaType.APPLICATION_JSON_TYPE); + CellSetModel model = mapper.readValue(response.getStream(), CellSetModel.class); + int count = TestScannerResource.countCellSet(model); + assertEquals(0, count); + } + + /** + * The Class ClientSideCellSetModel which mimics cell set model, and contains listener to perform + * user defined operations on the row model. + */ + @XmlRootElement(name = "CellSet") + @XmlAccessorType(XmlAccessType.FIELD) + public static class ClientSideCellSetModel implements Serializable { + + private static final long serialVersionUID = 1L; + + /** + * This list is not a real list; instead it will notify a listener whenever JAXB has + * unmarshalled the next row. + */ + @XmlElement(name="Row") + private List row; + + static boolean listenerInvoked = false; + + /** + * Install a listener for row model on this object. If l is null, the listener + * is removed again. + */ + public void setCellSetModelListener(final Listener l) { + row = (l == null) ? null : new ArrayList() { + private static final long serialVersionUID = 1L; + + public boolean add(RowModel o) { + l.handleRowModel(ClientSideCellSetModel.this, o); + listenerInvoked = true; + return false; + } + }; + } + + /** + * This listener is invoked every time a new row model is unmarshalled. + */ + public static interface Listener { + void handleRowModel(ClientSideCellSetModel helper, RowModel rowModel); + } + } +} + + +