Index: src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java (revision 588586) +++ src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java (working copy) @@ -26,7 +26,7 @@ import org.apache.jackrabbit.core.journal.RecordConsumer; import org.apache.jackrabbit.core.journal.Record; import org.apache.jackrabbit.core.journal.JournalException; -import org.apache.jackrabbit.core.journal.FileRevision; +import org.apache.jackrabbit.core.journal.InstanceRevision; import org.apache.jackrabbit.core.nodetype.InvalidNodeTypeDefException; import org.apache.jackrabbit.core.nodetype.NodeTypeDef; import org.apache.jackrabbit.core.observation.EventState; @@ -50,7 +50,6 @@ import java.util.Iterator; import java.util.Set; import java.util.HashSet; -import java.io.File; /** * Default clustered node implementation. @@ -176,7 +175,7 @@ /** * Instance revision file. */ - private FileRevision instanceRevision; + private InstanceRevision instanceRevision; /** * Workspace name used when consuming records. @@ -227,10 +226,9 @@ throw new ClusterException(msg); } try { - instanceRevision = new FileRevision(new File(revisionName)); - journal = (Journal) jc.newInstance(); journal.init(clusterNodeId, clusterContext.getNamespaceResovler()); + instanceRevision = journal.getInstanceRevision(revisionName); journal.register(this); } catch (ConfigurationException e) { throw new ClusterException(e.getMessage(), e.getCause()); Index: src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java (revision 588586) +++ src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java (working copy) @@ -22,6 +22,7 @@ import java.util.Map; import java.util.HashMap; import java.util.Iterator; +import java.io.File; import java.io.InputStream; import org.slf4j.Logger; @@ -320,4 +321,11 @@ public NamespaceResolver getResolver() { return resolver; } -} \ No newline at end of file + + /* (non-Javadoc) + * @see org.apache.jackrabbit.core.journal.Journal#getInstanceRevision(java.lang.String) + */ + public InstanceRevision getInstanceRevision(String name) throws JournalException { + return new FileRevision(new File(name)); + } +} Index: src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java (revision 588586) +++ src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java (working copy) @@ -32,6 +32,7 @@ import java.sql.DriverManager; import java.sql.DatabaseMetaData; import java.sql.Statement; +import java.util.Calendar; /** * Database-based journal implementation. Stores records inside a database table named @@ -140,6 +141,12 @@ */ private PreparedStatement insertRevisionStmt; + private PreparedStatement selectMinLocalRevisionStmt; + private PreparedStatement cleanRevisionStmt; + private PreparedStatement getLocalRevisionStmt; + private PreparedStatement insertLocalRevisionStmt; + private PreparedStatement updateLocalRevisionStmt; + /** * Locked revision. */ @@ -151,6 +158,35 @@ private long reconnectTimeMs; /** + * Whether the revision table janitor is enabled. + */ + private boolean janitorEnabled = false; + + /** + * The sleep time of the revision table janitor in seconds, 1 day default. + */ + private int janitorSleep = 60 * 60 * 24; + + /** + * When is the first run of the janitor scheduled? + * Default at 03:00 hours. + */ + private Calendar janitorNextRun = Calendar.getInstance(); + { + if (janitorNextRun.get(Calendar.HOUR_OF_DAY) >= 3) { + janitorNextRun.add(Calendar.DAY_OF_MONTH, 1); + } + janitorNextRun.set(Calendar.HOUR_OF_DAY, 3); + janitorNextRun.set(Calendar.MINUTE, 0); + janitorNextRun.set(Calendar.SECOND, 0); + janitorNextRun.set(Calendar.MILLISECOND, 0); + } + + /** + * The instance that manages the journal and local revision. + */ + private RevisionTableJanitor janitor; + /** * SQL statement returning all revisions within a range. */ protected String selectRevisionsStmtSQL; @@ -170,6 +206,12 @@ */ protected String insertRevisionStmtSQL; + protected String selectMinLocalRevisionStmtSQL; + protected String cleanRevisionStmtSQL; + protected String getLocalRevisionStmtSQL; + protected String insertLocalRevisionStmtSQL; + protected String updateLocalRevisionStmtSQL; + /** * Schema object prefix, bean property. */ @@ -203,6 +245,9 @@ String msg = "Unable to create connection."; throw new JournalException(msg, e); } + + initJanitor(); + log.info("DatabaseJournal initialized."); } @@ -244,6 +289,28 @@ } /** + * Initialize the Janitor + * + * @throws JournalException on error + */ + protected void initJanitor() throws JournalException { + janitor = new RevisionTableJanitor(); + // Start the clean-up thread if necessary. + if (janitorEnabled) { + Thread t1 = new Thread(janitor, "ClusterRevisionJanitor"); + t1.setDaemon(true); + t1.start(); + } + } + + /* (non-Javadoc) + * @see org.apache.jackrabbit.core.journal.AbstractJournal#getInstanceRevision(java.lang.String) + */ + public InstanceRevision getInstanceRevision(String name) throws JournalException { + return janitor; + } + + /** * Creates a new database connection. This method is called inside * {@link #init(String, org.apache.jackrabbit.name.NamespaceResolver)} or * when a connection has been dropped and must be reacquired. Base @@ -432,6 +499,16 @@ selectGlobalStmt = null; close(insertRevisionStmt); insertRevisionStmt = null; + close(selectMinLocalRevisionStmt); + selectMinLocalRevisionStmt = null; + close(cleanRevisionStmt); + cleanRevisionStmt = null; + close(getLocalRevisionStmt); + getLocalRevisionStmt = null; + close(insertLocalRevisionStmt); + insertLocalRevisionStmt = null; + close(updateLocalRevisionStmt); + updateLocalRevisionStmt = null; close(connection); connection = null; @@ -658,6 +735,19 @@ "insert into " + schemaObjectPrefix + "JOURNAL" + "(REVISION_ID, JOURNAL_ID, PRODUCER_ID, REVISION_DATA) " + "values (?,?,?,?)"; + selectMinLocalRevisionStmtSQL = + "select MIN(REVISION_ID) from " + schemaObjectPrefix + "LOCAL_REVISIONS"; + cleanRevisionStmtSQL = + "delete from " + schemaObjectPrefix + "JOURNAL " + "where REVISION_ID < ?"; + getLocalRevisionStmtSQL = + "select REVISION_ID from " + schemaObjectPrefix + "LOCAL_REVISIONS " + + "where JOURNAL_ID = ?"; + insertLocalRevisionStmtSQL = + "insert into " + schemaObjectPrefix + "LOCAL_REVISIONS " + + "(REVISION_ID, JOURNAL_ID) values (?,?)"; + updateLocalRevisionStmtSQL = + "update " + schemaObjectPrefix + "LOCAL_REVISIONS " + + "set REVISION_ID = ? where JOURNAL_ID = ?"; } /** @@ -670,6 +760,11 @@ updateGlobalStmt = connection.prepareStatement(updateGlobalStmtSQL); selectGlobalStmt = connection.prepareStatement(selectGlobalStmtSQL); insertRevisionStmt = connection.prepareStatement(insertRevisionStmtSQL); + selectMinLocalRevisionStmt = connection.prepareStatement(selectMinLocalRevisionStmtSQL); + cleanRevisionStmt = connection.prepareStatement(cleanRevisionStmtSQL); + getLocalRevisionStmt = connection.prepareStatement(getLocalRevisionStmtSQL); + insertLocalRevisionStmt = connection.prepareStatement(insertLocalRevisionStmtSQL); + updateLocalRevisionStmt = connection.prepareStatement(updateLocalRevisionStmtSQL); } /** @@ -703,6 +798,18 @@ return reconnectDelayMs; } + public boolean getJanitorEnabled() { + return janitorEnabled; + } + + public int getJanitorSleep() { + return janitorSleep; + } + + public int getJanitorFirstRunHourOfDay() { + return janitorNextRun.get(Calendar.HOUR_OF_DAY); + } + /** * Bean setters */ @@ -733,4 +840,192 @@ public void setReconnectDelayMs(long reconnectDelayMs) { this.reconnectDelayMs = reconnectDelayMs; } + + public void setJanitorEnabled(boolean enabled) { + this.janitorEnabled = enabled; + } + + public void setJanitorSleep(int sleep) { + this.janitorSleep = sleep; + } + + public void setJanitorFirstRunHourOfDay(int hourOfDay) { + janitorNextRun = Calendar.getInstance(); + if (janitorNextRun.get(Calendar.HOUR_OF_DAY) >= hourOfDay) { + janitorNextRun.add(Calendar.DAY_OF_MONTH, 1); + } + janitorNextRun.set(Calendar.HOUR_OF_DAY, hourOfDay); + janitorNextRun.set(Calendar.MINUTE, 0); + janitorNextRun.set(Calendar.SECOND, 0); + janitorNextRun.set(Calendar.MILLISECOND, 0); + } + + + /** + * Class for maintaining the revision table. This is only useful if all + * JR information except the searchindex is in the database (i.e., nodetypes + * etc). In that case, revision data can be thrown away from the JOURNAL table. + *
+ * When a new clusternode is added, first the ClusterNode is initialized, which on + * its turn calls {@link #init} which adds a local revision 0 to the LOCAL_REVISIONS + * table. This prevents other janitor tasks from deleting newly created records. + * Then the search index is built, and finally the ClusterNode is started and all + * revisions > 0 are processed. This guarantees that the search index is in good + * shape. + */ + public class RevisionTableJanitor implements Runnable, InstanceRevision { + + /** + * The cached local revision of this cluster node. + */ + private long localRevision; + + /** + * Constructs a RevisionTableJanitor and initializes the local + * revision of this clusternode. + * + * @throws JournalException on error + */ + public RevisionTableJanitor() throws JournalException { + // Write an initial local revision to the database if there is not + // yet an entry for this clusternode. This is necessary to keep + // the search index in good shape if it needs rebuilding. + localRevision = initializeLocalRevision(0L); + log.info("Initialized local revision to " + localRevision); + } + + // ------------------------ Runnable -------------------------------- + + /* (non-Javadoc) + * @see java.lang.Runnable#run() + */ + public void run() { + while (true) { + try { + log.info("Next clean-up run scheduled at " + janitorNextRun.getTime()); + long nextSleep = janitorNextRun.getTimeInMillis() - System.currentTimeMillis(); + Thread.sleep(nextSleep); + cleanUpOldRevisions(); + janitorNextRun.add(Calendar.SECOND, janitorSleep); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.info("Interrupted: stopping clean-up task."); + return; + } + } + } + + // ------------------------ InstanceRevision -------------------------------- + + /* (non-Javadoc) + * @see org.apache.jackrabbit.core.journal.InstanceRevision#get() + */ + public synchronized long get() { + return localRevision; + } + + /* (non-Javadoc) + * @see org.apache.jackrabbit.core.journal.InstanceRevision#set(long) + */ + public synchronized void set(long localRevision) { + // Update the cached value and the table with local revisions. + this.localRevision = localRevision; + try { + updateLocalRevisionStmt.clearParameters(); + updateLocalRevisionStmt.clearWarnings(); + updateLocalRevisionStmt.setLong(1, localRevision); + updateLocalRevisionStmt.setString(2, getId()); + updateLocalRevisionStmt.execute(); + } catch (SQLException e) { + log.warn("Failed to update local revision.", e); + close(true); + } + } + + // ------------------------ Local helper methods -------------------------------- + + /** + * Writes the local revision 0 to the database and returns this value if + * there is no entry yet. Otherwise, return the value stored in the database. + * + * @param revision the offset for the local revision counter + * @return the local revision + * @throws JournalException on error + */ + protected long initializeLocalRevision(long revision) throws JournalException { + try { + + // Check whether the connection is available + checkConnection(); + + // Check whether there is an entry in the database. + getLocalRevisionStmt.clearParameters(); + getLocalRevisionStmt.clearWarnings(); + getLocalRevisionStmt.setString(1, getId()); + getLocalRevisionStmt.execute(); + ResultSet rs = getLocalRevisionStmt.getResultSet(); + boolean exists = rs.next(); + if (exists) { + revision = rs.getLong(1); + } + rs.close(); + + // Insert + if (!exists) { + insertLocalRevisionStmt.clearParameters(); + insertLocalRevisionStmt.clearWarnings(); + insertLocalRevisionStmt.setLong(1, revision); + insertLocalRevisionStmt.setString(2, getId()); + insertLocalRevisionStmt.execute(); + } + + return revision; + + } catch (SQLException e) { + log.warn("Failed to initialize local revision.", e); + close(true); + throw new JournalException("Failed to initialize local revision", e); + } + } + + /** + * Cleans old revisions from the clustering table. It assumes + * that there already exists an entry for the local clusternode + * (it is created in {@link DatabaseJournal#initJanitor()}). + */ + protected void cleanUpOldRevisions() { + try { + long minRevision = 0; + + // Check whether the connection is available + checkConnection(); + + // Find the minimal local revision + selectMinLocalRevisionStmt.clearParameters(); + selectMinLocalRevisionStmt.clearWarnings(); + selectMinLocalRevisionStmt.execute(); + ResultSet rs = selectMinLocalRevisionStmt.getResultSet(); + boolean cleanUp = rs.next(); + if (cleanUp) { + minRevision = rs.getLong(1); + } + rs.close(); + + // Clean up if necessary: + if (cleanUp) { + cleanRevisionStmt.clearParameters(); + cleanRevisionStmt.clearWarnings(); + cleanRevisionStmt.setLong(1, minRevision); + cleanRevisionStmt.execute(); + if (log.isInfoEnabled()) { + log.info("Cleaned old revisions up to revision " + minRevision + "."); + } + } + + } catch (SQLException e) { + log.warn("Failed to clean up old revisions.", e); + close(true); + } + } + } } Index: src/main/java/org/apache/jackrabbit/core/journal/FileRevision.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/journal/FileRevision.java (revision 588586) +++ src/main/java/org/apache/jackrabbit/core/journal/FileRevision.java (working copy) @@ -26,7 +26,7 @@ /** * Maintains a file-based revision counter with locking, assuring uniqueness. */ -public class FileRevision { +public class FileRevision implements InstanceRevision { /** * Logger. Index: src/main/java/org/apache/jackrabbit/core/journal/InstanceRevision.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/journal/InstanceRevision.java (revision 0) +++ src/main/java/org/apache/jackrabbit/core/journal/InstanceRevision.java (revision 0) @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.journal; + +/** + * + */ +public interface InstanceRevision { + + /** + * Return current instance revision. + * + * @return instance revision + * @throws JournalException if some error occurs + */ + public long get() throws JournalException; + + /** + * Set current instance revision. + * + * @param value new instance revision + * @throws JournalException if some error occurs + */ + public void set(long value) throws JournalException; +} \ No newline at end of file Property changes on: src\main\java\org\apache\jackrabbit\core\journal\InstanceRevision.java ___________________________________________________________________ Name: svn:keywords + LastChangedBy LastChangedDate LastChangedRevision HeadURL Name: svn:eol-style + native Index: src/main/java/org/apache/jackrabbit/core/journal/Journal.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/journal/Journal.java (revision 588586) +++ src/main/java/org/apache/jackrabbit/core/journal/Journal.java (working copy) @@ -71,4 +71,13 @@ * Close this journal. This should release any resources still held by this journal. */ public void close(); + + /** + * Gets the instance that manages the instance revision. + * + * @param revisionName and identifier for the InstanceRevision manager + * @return the InstanceRevision manager + * @throws JournalException on error + */ + public InstanceRevision getInstanceRevision(String name) throws JournalException; } \ No newline at end of file Index: src/main/resources/org/apache/jackrabbit/core/journal/default.ddl =================================================================== --- src/main/resources/org/apache/jackrabbit/core/journal/default.ddl (revision 588586) +++ src/main/resources/org/apache/jackrabbit/core/journal/default.ddl (working copy) @@ -16,6 +16,7 @@ create unique index ${schemaObjectPrefix}JOURNAL_IDX on ${schemaObjectPrefix}JOURNAL (REVISION_ID) create table ${schemaObjectPrefix}GLOBAL_REVISION (REVISION_ID BIGINT NOT NULL) create unique index ${schemaObjectPrefix}GLOBAL_REVISION_IDX on ${schemaObjectPrefix}GLOBAL_REVISION (REVISION_ID) +create table ${schemaObjectPrefix}LOCAL_REVISIONS (JOURNAL_ID varchar(255) NOT NULL, REVISION_ID BIGINT NOT NULL) # Inserting the one and only revision counter record now helps avoiding race conditions insert into ${schemaObjectPrefix}GLOBAL_REVISION VALUES(0) Index: src/main/resources/org/apache/jackrabbit/core/journal/derby.ddl =================================================================== --- src/main/resources/org/apache/jackrabbit/core/journal/derby.ddl (revision 588586) +++ src/main/resources/org/apache/jackrabbit/core/journal/derby.ddl (working copy) @@ -16,6 +16,7 @@ create unique index ${schemaObjectPrefix}JOURNAL_IDX on ${schemaObjectPrefix}JOURNAL (REVISION_ID) create table ${schemaObjectPrefix}GLOBAL_REVISION (REVISION_ID BIGINT NOT NULL) create unique index ${schemaObjectPrefix}GLOBAL_REVISION_IDX on ${schemaObjectPrefix}GLOBAL_REVISION (REVISION_ID) +create table ${schemaObjectPrefix}LOCAL_REVISIONS (JOURNAL_ID varchar(255) NOT NULL, REVISION_ID BIGINT NOT NULL) # Inserting the one and only revision counter record now helps avoiding race conditions insert into ${schemaObjectPrefix}GLOBAL_REVISION VALUES(0) Index: src/main/resources/org/apache/jackrabbit/core/journal/h2.ddl =================================================================== --- src/main/resources/org/apache/jackrabbit/core/journal/h2.ddl (revision 588586) +++ src/main/resources/org/apache/jackrabbit/core/journal/h2.ddl (working copy) @@ -14,10 +14,11 @@ # limitations under the License. # # DDL script for the H2 database engine (http://www.h2database.com) -# +# set max_length_inplace_lob 4096 create table ${schemaObjectPrefix}JOURNAL (REVISION_ID bigint primary key, JOURNAL_ID varchar(255), PRODUCER_ID varchar(255), REVISION_DATA blob) create table ${schemaObjectPrefix}GLOBAL_REVISION (REVISION_ID bigint primary key) +create table ${schemaObjectPrefix}LOCAL_REVISIONS (JOURNAL_ID varchar(255) NOT NULL, REVISION_ID bigint NOT NULL) -# Inserting the one and only revision counter record now helps avoiding race conditions +# Inserting the one and only revision counter record now helps avoiding race conditions insert into ${schemaObjectPrefix}GLOBAL_REVISION VALUES(0) Index: src/main/resources/org/apache/jackrabbit/core/journal/mssql.ddl =================================================================== --- src/main/resources/org/apache/jackrabbit/core/journal/mssql.ddl (revision 588586) +++ src/main/resources/org/apache/jackrabbit/core/journal/mssql.ddl (working copy) @@ -2,6 +2,7 @@ create unique index ${schemaObjectPrefix}JOURNAL_IDX on ${schemaObjectPrefix}JOURNAL (REVISION_ID) create table ${schemaObjectPrefix}GLOBAL_REVISION (REVISION_ID BIGINT NOT NULL) create unique index ${schemaObjectPrefix}GLOBAL_REVISION_IDX on ${schemaObjectPrefix}GLOBAL_REVISION (REVISION_ID) +create table ${schemaObjectPrefix}LOCAL_REVISIONS (JOURNAL_ID varchar(255) NOT NULL, REVISION_ID BIGINT NOT NULL) # Inserting the one and only revision counter record now helps avoiding race conditions insert into ${schemaObjectPrefix}GLOBAL_REVISION VALUES(0) Index: src/main/resources/org/apache/jackrabbit/core/journal/mysql.ddl =================================================================== --- src/main/resources/org/apache/jackrabbit/core/journal/mysql.ddl (revision 588586) +++ src/main/resources/org/apache/jackrabbit/core/journal/mysql.ddl (working copy) @@ -16,6 +16,7 @@ create unique index ${schemaObjectPrefix}JOURNAL_IDX on ${schemaObjectPrefix}JOURNAL (REVISION_ID) create table ${schemaObjectPrefix}GLOBAL_REVISION (REVISION_ID BIGINT NOT NULL) create unique index ${schemaObjectPrefix}GLOBAL_REVISION_IDX on ${schemaObjectPrefix}GLOBAL_REVISION (REVISION_ID) +create table ${schemaObjectPrefix}LOCAL_REVISIONS (JOURNAL_ID varchar(255) NOT NULL, REVISION_ID BIGINT NOT NULL) # Inserting the one and only revision counter record now helps avoiding race conditions insert into ${schemaObjectPrefix}GLOBAL_REVISION VALUES(0) Index: src/main/resources/org/apache/jackrabbit/core/journal/oracle.ddl =================================================================== --- src/main/resources/org/apache/jackrabbit/core/journal/oracle.ddl (revision 588586) +++ src/main/resources/org/apache/jackrabbit/core/journal/oracle.ddl (working copy) @@ -16,6 +16,7 @@ create unique index ${schemaObjectPrefix}JOURNAL_IDX on ${schemaObjectPrefix}JOURNAL (REVISION_ID) ${tableSpace} create table ${schemaObjectPrefix}GLOBAL_REVISION (REVISION_ID number(20,0) NOT NULL) ${tableSpace} create unique index ${schemaObjectPrefix}GLOBAL_REVISION_IDX on ${schemaObjectPrefix}GLOBAL_REVISION (REVISION_ID) ${tableSpace} +create table ${schemaObjectPrefix}LOCAL_REVISIONS (JOURNAL_ID varchar(255) NOT NULL, REVISION_ID number(20,0) NOT NULL) # Inserting the one and only revision counter record now helps avoiding race conditions insert into ${schemaObjectPrefix}GLOBAL_REVISION VALUES(0) Index: src/main/resources/org/apache/jackrabbit/core/journal/postgresql.ddl =================================================================== --- src/main/resources/org/apache/jackrabbit/core/journal/postgresql.ddl (revision 588586) +++ src/main/resources/org/apache/jackrabbit/core/journal/postgresql.ddl (working copy) @@ -16,6 +16,7 @@ create unique index ${schemaObjectPrefix}JOURNAL_IDX on ${schemaObjectPrefix}JOURNAL (REVISION_ID) create table ${schemaObjectPrefix}GLOBAL_REVISION (REVISION_ID BIGINT NOT NULL) create unique index ${schemaObjectPrefix}GLOBAL_REVISION_IDX on ${schemaObjectPrefix}GLOBAL_REVISION (REVISION_ID) +create table ${schemaObjectPrefix}LOCAL_REVISIONS (JOURNAL_ID varchar(255) NOT NULL, REVISION_ID BIGINT NOT NULL) # Inserting the one and only revision counter record now helps avoiding race conditions insert into ${schemaObjectPrefix}GLOBAL_REVISION VALUES(0)