Index: src/test/java/org/apache/jackrabbit/core/query/AbstractQueryTest.java
===================================================================
--- src/test/java/org/apache/jackrabbit/core/query/AbstractQueryTest.java	(revision 1092710)
+++ src/test/java/org/apache/jackrabbit/core/query/AbstractQueryTest.java	(working copy)
@@ -16,6 +16,8 @@
  */
 package org.apache.jackrabbit.core.query;
 
+import static javax.jcr.query.Query.JCR_SQL2;
+
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -124,11 +126,11 @@
      * @return the elements of the iterator as an array of Nodes.
      */
     protected Node[] toArray(NodeIterator it) {
-        List nodes = new ArrayList();
+        List<Node> nodes = new ArrayList<Node>();
         while (it.hasNext()) {
             nodes.add(it.nextNode());
         }
-        return (Node[]) nodes.toArray(new Node[nodes.size()]);
+        return nodes.toArray(new Node[nodes.size()]);
     }
 
     /**
@@ -201,22 +203,22 @@
     protected void checkResult(NodeIterator result, Node[] nodes)
             throws RepositoryException {
         // collect paths
-        Set expectedPaths = new HashSet();
-        for (int i = 0; i < nodes.length; i++) {
-            expectedPaths.add(nodes[i].getPath());
+        Set<String> expectedPaths = new HashSet<String>();
+        for (Node n : nodes) {
+            expectedPaths.add(n.getPath());
         }
-        Set resultPaths = new HashSet();
+        Set<String> resultPaths = new HashSet<String>();
         while (result.hasNext()) {
             resultPaths.add(result.nextNode().getPath());
         }
         // check if all expected are in result
-        for (Iterator it = expectedPaths.iterator(); it.hasNext();) {
-            String path = (String) it.next();
+        for (Iterator<String> it = expectedPaths.iterator(); it.hasNext();) {
+            String path = it.next();
             assertTrue(path + " is not part of the result set", resultPaths.contains(path));
         }
         // check result does not contain more than expected
-        for (Iterator it = resultPaths.iterator(); it.hasNext();) {
-            String path = (String) it.next();
+        for (Iterator<String> it = resultPaths.iterator(); it.hasNext();) {
+            String path = it.next();
             assertTrue(path + " is not expected to be part of the result set", expectedPaths.contains(path));
         }
     }
@@ -255,6 +257,11 @@
             return qm.createQuery(statement, Query.XPATH).execute();
         }
     }
+    
+    protected QueryResult executeSQL2Query(String statement)
+            throws RepositoryException {
+        return qm.createQuery(statement, JCR_SQL2).execute();
+    }
 
     /**
      * Returns a reference to the underlying search index.
Index: src/test/java/org/apache/jackrabbit/core/query/SQL2OffsetLimitTest.java
===================================================================
--- src/test/java/org/apache/jackrabbit/core/query/SQL2OffsetLimitTest.java	(revision 0)
+++ src/test/java/org/apache/jackrabbit/core/query/SQL2OffsetLimitTest.java	(revision 0)
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.query;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import javax.jcr.Node;
+import javax.jcr.RepositoryException;
+import javax.jcr.query.Query;
+import javax.jcr.query.QueryResult;
+import javax.jcr.query.Row;
+
+import org.apache.jackrabbit.commons.JcrUtils;
+
+/**
+ * Test case for queries with JCR_SQL2 having offset and limit clauses
+ * 
+ * Inspired by <a
+ * href="https://issues.apache.org/jira/browse/JCR-2830">JCR-2830</a>
+ * 
+ */
+public class SQL2OffsetLimitTest extends AbstractQueryTest {
+
+    private final List<String> c = Arrays.asList("a", "b", "c", "d", "e");
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        for (String s : c) {
+            testRootNode.addNode(s);
+        }
+        testRootNode.getSession().save();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        for (Node c : JcrUtils.getChildNodes(testRootNode)) {
+            testRootNode.getSession().removeItem(c.getPath());
+        }
+        testRootNode.getSession().save();
+        super.tearDown();
+    }
+
+    private Query newQuery() throws Exception {
+        return qm.createQuery("SELECT * FROM [nt:base] WHERE ISCHILDNODE(["
+                + testRoot + "])", Query.JCR_SQL2);
+    }
+
+    public void testNoConstraints() throws Exception {
+        List<String> expected = new ArrayList<String>(c);
+        Query q = newQuery();
+
+        List<String> out = qrToPaths(q.execute());
+        assertEquals(c.size(), out.size());
+        for (String s : out) {
+            assertTrue(expected.remove(s));
+        }
+        assertTrue(expected.isEmpty());
+    }
+
+    public void testLimitEqSize() throws Exception {
+        List<String> expected = new ArrayList<String>(c);
+        Query q = newQuery();
+        q.setOffset(0);
+        q.setLimit(c.size());
+
+        List<String> out = qrToPaths(q.execute());
+        assertEquals(c.size(), out.size());
+        for (String s : out) {
+            assertTrue(expected.remove(s));
+        }
+        assertTrue(expected.isEmpty());
+    }
+
+    public void testLimitGtSize() throws Exception {
+        List<String> expected = new ArrayList<String>(c);
+        Query q = newQuery();
+        q.setOffset(0);
+        q.setLimit(c.size() * 2);
+
+        List<String> out = qrToPaths(q.execute());
+        assertEquals(c.size(), out.size());
+        for (String s : out) {
+            assertTrue(expected.remove(s));
+        }
+        assertTrue(expected.isEmpty());
+    }
+
+    public void testOffsetEqSize() throws Exception {
+        Query q = newQuery();
+        q.setOffset(c.size() - 1);
+        List<String> out = qrToPaths(q.execute());
+        assertEquals(1, out.size());
+    }
+
+    public void testOffsetGtSize() throws Exception {
+        Query q = newQuery();
+        q.setOffset(c.size() * 2);
+        List<String> out = qrToPaths(q.execute());
+        assertTrue(out.isEmpty());
+    }
+
+    public void testSimplePagination() throws Exception {
+        List<String> expected = new ArrayList<String>(c);
+        Query q = newQuery();
+
+        for (int i = 0; i < c.size(); i++) {
+            q.setOffset(i);
+            q.setLimit(1);
+            List<String> out = qrToPaths(q.execute());
+            assertEquals(1, out.size());
+            assertTrue(expected.remove(out.get(0)));
+        }
+        assertTrue(expected.isEmpty());
+    }
+
+    public void test2BigPages() throws Exception {
+        List<String> expected = new ArrayList<String>(c);
+        Query q = newQuery();
+
+        int p1 = (int) (c.size() * 0.8);
+        int p2 = c.size() - p1;
+
+        q.setOffset(0);
+        q.setLimit(p1);
+        List<String> out1 = qrToPaths(q.execute());
+        assertEquals(p1, out1.size());
+        for (String s : out1) {
+            assertTrue(expected.remove(s));
+        }
+
+        q.setOffset(p1);
+        q.setLimit(p2);
+        List<String> out2 = qrToPaths(q.execute());
+        assertEquals(p2, out2.size());
+        for (String s : out2) {
+            assertTrue(expected.remove(s));
+        }
+
+        assertTrue(expected.isEmpty());
+    }
+
+    private List<String> qrToPaths(QueryResult qr) throws RepositoryException {
+        List<String> ret = new ArrayList<String>();
+        for (Row row : JcrUtils.getRows(qr)) {
+            Node n = row.getNode();
+            ret.add(n.getPath().replace(n.getParent().getPath() + "/", ""));
+        }
+        return ret;
+    }
+}
Index: src/main/java/org/apache/jackrabbit/core/query/lucene/LuceneQueryFactory.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/query/lucene/LuceneQueryFactory.java	(revision 1092736)
+++ src/main/java/org/apache/jackrabbit/core/query/lucene/LuceneQueryFactory.java	(working copy)
@@ -107,6 +107,7 @@
 import org.apache.lucene.search.BooleanClause.Occur;
 import org.apache.lucene.search.BooleanQuery;
 import org.apache.lucene.search.Query;
+import org.apache.lucene.search.Sort;
 
 /**
  * Factory that creates Lucene queries from QOM elements.
@@ -166,10 +167,27 @@
         this.primaryTypeField = nsMappings.translateName(JCR_PRIMARYTYPE);
     }
 
-    public List<Row> execute(
-            Map<String, PropertyValue> columns, Selector selector,
-            Constraint constraint) throws RepositoryException, IOException {
+    /**
+     * @param columns
+     * @param selector
+     * @param constraint
+     * @param hasOrderByClause
+     *            if <code>true</code> it means that the lqf should just let the
+     *            QueryEngine take care of sorting and applying applying offset
+     *            and limit constraints
+     * @param offsetIn used in pagination
+     * @param limitIn used in pagination
+     * @return a list of rows
+     * @throws RepositoryException
+     * @throws IOException
+     */
+    public List<Row> execute(Map<String, PropertyValue> columns,
+            Selector selector, Constraint constraint, boolean hasOrderByClause,
+            long offsetIn, long limitIn) throws RepositoryException, IOException {
         final IndexReader reader = index.getIndexReader(true);
+        final int offset = offsetIn < 0 ? 0 : (int)offsetIn;
+        final int limit = limitIn < 0 ? Integer.MAX_VALUE : (int)limitIn;
+        
         QueryHits hits = null;
         try {
             JackrabbitIndexSearcher searcher = new JackrabbitIndexSearcher(
@@ -190,27 +208,48 @@
                         constraint, Collections.singletonMap(name, type),
                         searcher, reader);
             }
+            
+            // TODO depending on the filters, you could push the offset info
+            // into the searcher            
+            hits = searcher.evaluate(qp.mainQuery, new Sort(), offset + limit);
 
             List<Row> rows = new ArrayList<Row>();
-            hits = searcher.evaluate(qp.mainQuery);
+            int currentNode = 0;
+            int addedNodes = 0;
             ScoreNode node = hits.nextScoreNode();
             while (node != null) {
+                Row row = null;
                 try {
-                    Row row = new SelectorRow(
-                            columns, evaluator, selector.getSelectorName(),
+                    row = new SelectorRow(columns, evaluator,
+                            selector.getSelectorName(),
                             session.getNodeById(node.getNodeId()),
                             node.getScore());
-                    if (filter.evaluate(row)) {
-                        rows.add(row);
-                    }
                 } catch (ItemNotFoundException e) {
                     // skip the node
                 }
+                if (row != null && filter.evaluate(row)) {
+                    // apply limit and offset rules locally if there are no
+                    // order by clauses
+                    if (hasOrderByClause) {
+                        rows.add(row);
+                    } else {
+                        if (currentNode >= offset
+                                && currentNode - offset < limit) {
+                            rows.add(row);
+                            addedNodes++;
+                        }
+                        currentNode++;
+                        // end the loop when going over the limit
+                        if (addedNodes == limit) {
+                            break;
+                        }
+                    }
+                }
                 node = hits.nextScoreNode();
             }
             return rows;
         } finally {
-            if(hits != null){
+            if (hits != null) {
                 hits.close();
             }
             Util.closeOrRelease(reader);
Index: src/main/java/org/apache/jackrabbit/core/query/lucene/join/QueryEngine.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/query/lucene/join/QueryEngine.java	(revision 1092719)
+++ src/main/java/org/apache/jackrabbit/core/query/lucene/join/QueryEngine.java	(working copy)
@@ -441,23 +441,35 @@
         String[] columnNames = columnMap.keySet().toArray(
                 new String[columnMap.size()]);
 
+        // if true it means that the LuceneQueryFactory should just let the
+        // QueryEngine take care of sorting and applying offset and limit
+        // constraints
+        boolean hasOrderByClause = orderings != null && orderings.length > 0;
+        RowIterator rows = null;
         try {
-            RowIterator rows = new RowIteratorAdapter(lqf.execute(columnMap,
-                    selector, constraint));
-            QueryResult result = new SimpleQueryResult(columnNames,
-                    selectorNames, rows);
-            return sort(result, orderings, evaluator, offset, limit);
+            rows = new RowIteratorAdapter(lqf.execute(columnMap, selector,
+                    constraint, hasOrderByClause, offset, limit));
         } catch (IOException e) {
             throw new RepositoryException("Failed to access the query index", e);
         } finally {
-            if (log.isDebugEnabled()) {
-                time = System.currentTimeMillis() - time;
-                log.debug(genString(printIndentation) + "SQL2 SELECT took "
-                        + time + " ms. selector: " + selector
-                        + ", columns: " + Arrays.toString(columnNames)
-                        + ", constraint: " + constraint);
-            }
+            log.debug(
+                    "{}SQL2 SELECT took {} ms. selector: {}, columns: {}, constraint: {}, offset {}, limit {}",
+                    new Object[] { genString(printIndentation),
+                            System.currentTimeMillis() - time, selector,
+                            Arrays.toString(columnNames), constraint, offset,
+                            limit });
         }
+        QueryResult result = new SimpleQueryResult(columnNames, selectorNames,
+                rows);
+        if (!hasOrderByClause) {
+            return result;
+        }
+        long timeSort = System.currentTimeMillis();
+        QueryResult sorted = sort(result, orderings, evaluator, offset, limit);
+
+        log.debug("{}SQL2 SORT took {} ms.", genString(printIndentation),
+                System.currentTimeMillis() - timeSort);
+        return sorted;
     }
 
     private Map<String, PropertyValue> getColumnMap(
Index: src/main/java/org/apache/jackrabbit/core/query/lucene/SortedLuceneQueryHits.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/query/lucene/SortedLuceneQueryHits.java	(revision 1092719)
+++ src/main/java/org/apache/jackrabbit/core/query/lucene/SortedLuceneQueryHits.java	(working copy)
@@ -80,7 +80,7 @@
     /**
      * The score docs.
      */
-    private final List<ScoreDoc> scoreDocs = new ArrayList<ScoreDoc>();
+    private List<ScoreDoc> scoreDocs = new ArrayList<ScoreDoc>();
 
     /**
      * The total number of hits.
@@ -88,10 +88,12 @@
     private int size;
 
     /**
-     * Number of hits to retrieve.
+     * Number of hits to be pre-fetched from the lucene index (will be around 2x resultFetchHint).
      */
     private int numHits;
 
+    private int offset = 0;
+
     /**
      * Creates a new <code>QueryHits</code> instance wrapping <code>hits</code>.
      *
@@ -99,7 +101,7 @@
      * @param searcher        the index searcher.
      * @param query           the query to execute.
      * @param sort            the sort criteria.
-     * @param resultFetchHint a hint on how many results should be fetched.
+     * @param resultFetchHint a hint on how many results should be pre-fetched from the lucene index.
      * @throws IOException if an error occurs while reading from the index.
      */
     public SortedLuceneQueryHits(IndexReader reader,
@@ -131,16 +133,15 @@
         if (++hitIndex >= size) {
             // no more score nodes
             return null;
-        } else if (hitIndex >= scoreDocs.size()) {
+        } else if (hitIndex - offset >= scoreDocs.size()) {
             // refill at least numHits or twice hitIndex
             this.numHits = Math.max(this.numHits, hitIndex * 2);
             getHits();
         }
-        ScoreDoc doc = scoreDocs.get(hitIndex);
+        ScoreDoc doc = scoreDocs.get(hitIndex - offset);
         String uuid = reader.document(doc.doc,
                 FieldSelectors.UUID).get(FieldNames.UUID);
-        NodeId id = new NodeId(uuid);
-        return new ScoreNode(id, doc.score, doc.doc);
+        return new ScoreNode(new NodeId(uuid), doc.score, doc.doc);
     }
 
     /**
@@ -159,8 +160,9 @@
         TopFieldCollector collector = TopFieldCollector.create(sort, numHits, false, true, false, false);
         searcher.search(query, collector);
         size = collector.getTotalHits();
-        ScoreDoc[] docs = collector.topDocs().scoreDocs;
-        scoreDocs.addAll(Arrays.asList(docs).subList(scoreDocs.size(), docs.length));
+        offset += scoreDocs.size();
+        ScoreDoc[] docs = collector.topDocs(offset, numHits).scoreDocs;
+        scoreDocs = Arrays.asList(docs);
         log.debug("getHits() {}/{}", scoreDocs.size(), numHits);
         // double hits for next round
         numHits *= 2;
