diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java index d4fd6cc..cf16925 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java @@ -21,6 +21,17 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; +import org.apache.jackrabbit.oak.segment.standby.codec.GetBlobRequest; +import org.apache.jackrabbit.oak.segment.standby.codec.GetBlobRequestEncoder; +import org.apache.jackrabbit.oak.segment.standby.codec.GetBlobResponse; +import org.apache.jackrabbit.oak.segment.standby.codec.GetHeadRequest; +import org.apache.jackrabbit.oak.segment.standby.codec.GetHeadRequestEncoder; +import org.apache.jackrabbit.oak.segment.standby.codec.GetHeadResponse; +import org.apache.jackrabbit.oak.segment.standby.codec.GetSegmentRequest; +import org.apache.jackrabbit.oak.segment.standby.codec.GetSegmentRequestEncoder; +import org.apache.jackrabbit.oak.segment.standby.codec.GetSegmentResponse; +import org.apache.jackrabbit.oak.segment.standby.codec.ResponseDecoder; + import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; @@ -33,19 +44,10 @@ import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.compression.SnappyFramedDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.util.CharsetUtil; -import org.apache.jackrabbit.oak.segment.standby.codec.GetBlobRequest; -import org.apache.jackrabbit.oak.segment.standby.codec.GetBlobRequestEncoder; -import org.apache.jackrabbit.oak.segment.standby.codec.GetBlobResponse; -import org.apache.jackrabbit.oak.segment.standby.codec.GetHeadRequest; -import org.apache.jackrabbit.oak.segment.standby.codec.GetHeadRequestEncoder; -import org.apache.jackrabbit.oak.segment.standby.codec.GetHeadResponse; -import org.apache.jackrabbit.oak.segment.standby.codec.GetSegmentRequest; -import org.apache.jackrabbit.oak.segment.standby.codec.GetSegmentRequestEncoder; -import org.apache.jackrabbit.oak.segment.standby.codec.GetSegmentResponse; -import org.apache.jackrabbit.oak.segment.standby.codec.ResponseDecoder; class StandbyClient implements AutoCloseable { @@ -77,7 +79,7 @@ class StandbyClient implements AutoCloseable { final SslContext sslContext; if (secure) { - sslContext = SslContext.newClientContext(InsecureTrustManagerFactory.INSTANCE); + sslContext = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build(); } else { sslContext = null; } diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSync.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSync.java new file mode 100644 index 0000000..ec6e4e6 --- /dev/null +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSync.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.segment.standby.client; + +import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount; + +import java.io.Closeable; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.management.MBeanServer; +import javax.management.ObjectName; +import javax.management.StandardMBean; +import javax.net.ssl.SSLException; + +import com.google.common.base.Supplier; +import org.apache.jackrabbit.oak.segment.file.FileStore; +import org.apache.jackrabbit.oak.segment.standby.jmx.ClientStandbyStatusMBean; +import org.apache.jackrabbit.oak.segment.standby.jmx.StandbyStatusMBean; +import org.apache.jackrabbit.oak.segment.standby.store.CommunicationObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class StandbyClientSync implements ClientStandbyStatusMBean, Runnable, Closeable { + + public static final String CLIENT_ID_PROPERTY_NAME = "standbyID"; + + private static final Logger log = LoggerFactory.getLogger(StandbyClientSync.class); + + private final String host; + + private final int port; + + private final int readTimeoutMs; + + private final boolean autoClean; + + private final CommunicationObserver observer; + + private final boolean secure; + + private boolean active = false; + + private int failedRequests; + + private long lastSuccessfulRequest; + + private volatile String state; + + private final Object sync = new Object(); + + private final FileStore fileStore; + + private final AtomicBoolean running = new AtomicBoolean(true); + + private long syncStartTimestamp; + + private long syncEndTimestamp; + + public StandbyClientSync(String host, int port, FileStore store, boolean secure, int readTimeoutMs, boolean autoClean) throws SSLException { + this.state = STATUS_INITIALIZING; + this.lastSuccessfulRequest = -1; + this.syncStartTimestamp = -1; + this.syncEndTimestamp = -1; + this.failedRequests = 0; + this.host = host; + this.port = port; + this.secure = secure; + this.readTimeoutMs = readTimeoutMs; + this.autoClean = autoClean; + this.fileStore = store; + String s = System.getProperty(CLIENT_ID_PROPERTY_NAME); + this.observer = new CommunicationObserver((s == null || s.length() == 0) ? UUID.randomUUID().toString() : s); + + final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer(); + try { + jmxServer.registerMBean(new StandardMBean(this, ClientStandbyStatusMBean.class), new ObjectName(this.getMBeanName())); + } catch (Exception e) { + log.error("can register standby status mbean", e); + } + } + + public String getMBeanName() { + return StandbyStatusMBean.JMX_NAME + ",id=\"" + this.observer.getID() + "\""; + } + + @Override + public void close() { + stop(); + state = STATUS_CLOSING; + final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer(); + try { + jmxServer.unregisterMBean(new ObjectName(this.getMBeanName())); + } catch (Exception e) { + log.error("can unregister standby status mbean", e); + } + observer.unregister(); + state = STATUS_CLOSED; + } + + @Override + public void run() { + if (!isRunning()) { + // manually stopped + return; + } + + state = STATUS_STARTING; + + synchronized (sync) { + if (active) { + return; + } + state = STATUS_RUNNING; + active = true; + } + + try { + long startTimestamp = System.currentTimeMillis(); + try (StandbyClient client = new StandbyClient(observer.getID(), secure, readTimeoutMs)) { + client.connect(host, port); + + long sizeBefore = fileStore.getStats().getApproximateSize(); + new StandbyClientSyncExecution(fileStore, client, newRunningSupplier()).execute(); + long sizeAfter = fileStore.getStats().getApproximateSize(); + + if (autoClean && sizeBefore > 0) { + // if size gain is over 25% call cleanup + if (sizeAfter - sizeBefore > 0.25 * sizeBefore) { + log.info("Store size increased from {} to {}, will run cleanup.", humanReadableByteCount(sizeBefore), humanReadableByteCount(sizeAfter)); + fileStore.cleanup(); + } + } + } + this.failedRequests = 0; + this.syncStartTimestamp = startTimestamp; + this.syncEndTimestamp = System.currentTimeMillis(); + this.lastSuccessfulRequest = syncEndTimestamp / 1000; + } catch (Exception e) { + this.failedRequests++; + log.error("Failed synchronizing state.", e); + } finally { + synchronized (this.sync) { + this.active = false; + } + } + } + + private Supplier newRunningSupplier() { + return new Supplier() { + + @Override + public Boolean get() { + return running.get(); + } + + }; + } + + @Override + public String getMode() { + return "client: " + this.observer.getID(); + } + + @Override + public boolean isRunning() { + return running.get(); + } + + @Override + public void start() { + running.set(true); + state = STATUS_RUNNING; + } + + @Override + public void stop() { + running.set(false); + state = STATUS_STOPPED; + } + + @Override + public String getStatus() { + return this.state; + } + + @Override + public int getFailedRequests() { + return this.failedRequests; + } + + @Override + public int getSecondsSinceLastSuccess() { + if (this.lastSuccessfulRequest < 0) { + return -1; + } + return (int) (System.currentTimeMillis() / 1000 - this.lastSuccessfulRequest); + } + + @Override + public int calcFailedRequests() { + return this.getFailedRequests(); + } + + @Override + public int calcSecondsSinceLastSuccess() { + return this.getSecondsSinceLastSuccess(); + } + + @Override + public void cleanup() { + try { + fileStore.cleanup(); + } catch (IOException e) { + log.error("Error while cleaning up", e); + } + } + + @Override + public long getSyncStartTimestamp() { + return syncStartTimestamp; + } + + @Override + public long getSyncEndTimestamp() { + return syncEndTimestamp; + } + +} diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java new file mode 100644 index 0000000..5366168 --- /dev/null +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.oak.segment.standby.client; + +import static com.google.common.collect.Maps.newHashMap; +import static com.google.common.collect.Sets.newHashSet; + +import java.util.LinkedList; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import javax.annotation.Nonnull; + +import com.google.common.base.Supplier; +import org.apache.jackrabbit.oak.segment.RecordId; +import org.apache.jackrabbit.oak.segment.Segment; +import org.apache.jackrabbit.oak.segment.SegmentId; +import org.apache.jackrabbit.oak.segment.SegmentNodeBuilder; +import org.apache.jackrabbit.oak.segment.SegmentNodeState; +import org.apache.jackrabbit.oak.segment.SegmentNotFoundException; +import org.apache.jackrabbit.oak.segment.file.FileStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Encapsulates the algorithm for a single execution of the synchronization + * process between the primary and the standby instance. It also contains + * temporary state that is supposed to be used for the lifetime of a + * synchronization run. + */ +class StandbyClientSyncExecution { + + private static final Logger log = LoggerFactory.getLogger(StandbyClientSyncExecution.class); + + private final FileStore store; + + private final StandbyClient client; + + private final Supplier running; + + private final Set visited = newHashSet(); + + private final Map cache = newHashMap(); + + StandbyClientSyncExecution(FileStore store, StandbyClient client, Supplier running) { + this.store = store; + this.client = client; + this.running = running; + } + + void execute() throws Exception { + RecordId remoteHead = getHead(); + + if (remoteHead.equals(store.getHead().getRecordId())) { + return; + } + + long t = System.currentTimeMillis(); + SegmentNodeState before = store.getHead(); + SegmentNodeBuilder builder = before.builder(); + SegmentNodeState current = newSegmentNodeState(remoteHead); + compareAgainstBaseState(current, before, builder); + boolean ok = setHead(before, builder.getNodeState()); + log.debug("updated head state successfully: {} in {}ms.", ok, System.currentTimeMillis() - t); + } + + private RecordId getHead() throws Exception { + return RecordId.fromString(store, client.getHead()); + } + + private SegmentNodeState newSegmentNodeState(RecordId id) { + return store.getReader().readNode(id); + } + + private boolean setHead(@Nonnull SegmentNodeState expected, @Nonnull SegmentNodeState head) { + return store.getRevisions().setHead(expected.getRecordId(), head.getRecordId()); + } + + private boolean compareAgainstBaseState(SegmentNodeState current, SegmentNodeState before, SegmentNodeBuilder builder) throws Exception { + while (true) { + try { + return current.compareAgainstBaseState(before, new StandbyDiff(builder, store, client, running)); + } catch (SegmentNotFoundException e) { + log.info("Found missing segment {}", e.getSegmentId()); + copySegmentHierarchyFromPrimary(UUID.fromString(e.getSegmentId())); + } + } + } + + private void copySegmentHierarchyFromPrimary(UUID segmentId) throws Exception { + LinkedList batch = new LinkedList<>(); + + batch.offer(segmentId); + + while (batch.size() > 0) { + UUID current = batch.remove(); + + log.info("Loading segment {}", current); + Segment segment = copySegmentFromPrimary(current); + + log.info("Marking segment {} as loaded", current); + visited.add(current); + + if (!SegmentId.isDataSegmentId(current.getLeastSignificantBits())) { + continue; + } + + log.info("Inspecting segment {} for references", current); + for (int i = 0; i < segment.getReferencedSegmentIdCount(); i++) { + UUID referenced = segment.getReferencedSegmentId(i); + + if (visited.contains(referenced)) { + continue; + } + + log.info("Found reference from {} to {}", current, referenced); + if (SegmentId.isDataSegmentId(referenced.getLeastSignificantBits())) { + batch.add(referenced); + } else { + batch.addFirst(referenced); + } + } + } + } + + private Segment copySegmentFromPrimary(UUID uuid) throws Exception { + Segment result = cache.get(uuid); + + if (result != null) { + log.info("Segment {} was found in the local cache", uuid); + return result; + } + + byte[] data = client.getSegment(uuid.toString()); + + if (data == null) { + throw new IllegalStateException("Unable to read segment " + uuid); + } + + long msb = uuid.getMostSignificantBits(); + long lsb = uuid.getLeastSignificantBits(); + SegmentId segmentId = store.newSegmentId(msb, lsb); + store.writeSegment(segmentId, data, 0, data.length); + result = segmentId.getSegment(); + cache.put(uuid, result); + return result; + } + +} + diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbySync.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbySync.java deleted file mode 100644 index 5bfae0c..0000000 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbySync.java +++ /dev/null @@ -1,246 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.jackrabbit.oak.segment.standby.client; - -import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount; - -import java.io.Closeable; -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.management.MBeanServer; -import javax.management.ObjectName; -import javax.management.StandardMBean; -import javax.net.ssl.SSLException; - -import com.google.common.base.Supplier; -import org.apache.jackrabbit.oak.segment.file.FileStore; -import org.apache.jackrabbit.oak.segment.standby.jmx.ClientStandbyStatusMBean; -import org.apache.jackrabbit.oak.segment.standby.jmx.StandbyStatusMBean; -import org.apache.jackrabbit.oak.segment.standby.store.CommunicationObserver; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public final class StandbySync implements ClientStandbyStatusMBean, Runnable, Closeable { - - public static final String CLIENT_ID_PROPERTY_NAME = "standbyID"; - - private static final Logger log = LoggerFactory.getLogger(StandbySync.class); - - private final String host; - - private final int port; - - private final int readTimeoutMs; - - private final boolean autoClean; - - private final CommunicationObserver observer; - - private final boolean secure; - - private boolean active = false; - - private int failedRequests; - - private long lastSuccessfulRequest; - - private volatile String state; - - private final Object sync = new Object(); - - private final FileStore fileStore; - - private final AtomicBoolean running = new AtomicBoolean(true); - - private long syncStartTimestamp; - - private long syncEndTimestamp; - - public StandbySync(String host, int port, FileStore store, boolean secure, int readTimeoutMs, boolean autoClean) throws SSLException { - this.state = STATUS_INITIALIZING; - this.lastSuccessfulRequest = -1; - this.syncStartTimestamp = -1; - this.syncEndTimestamp = -1; - this.failedRequests = 0; - this.host = host; - this.port = port; - this.secure = secure; - this.readTimeoutMs = readTimeoutMs; - this.autoClean = autoClean; - this.fileStore = store; - String s = System.getProperty(CLIENT_ID_PROPERTY_NAME); - this.observer = new CommunicationObserver((s == null || s.length() == 0) ? UUID.randomUUID().toString() : s); - - final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer(); - try { - jmxServer.registerMBean(new StandardMBean(this, ClientStandbyStatusMBean.class), new ObjectName(this.getMBeanName())); - } catch (Exception e) { - log.error("can register standby status mbean", e); - } - } - - public String getMBeanName() { - return StandbyStatusMBean.JMX_NAME + ",id=\"" + this.observer.getID() + "\""; - } - - public void close() { - stop(); - state = STATUS_CLOSING; - final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer(); - try { - jmxServer.unregisterMBean(new ObjectName(this.getMBeanName())); - } catch (Exception e) { - log.error("can unregister standby status mbean", e); - } - observer.unregister(); - state = STATUS_CLOSED; - } - - public void run() { - if (!isRunning()) { - // manually stopped - return; - } - - state = STATUS_STARTING; - - synchronized (sync) { - if (active) { - return; - } - state = STATUS_RUNNING; - active = true; - } - - try { - long startTimestamp = System.currentTimeMillis(); - try (StandbyClient client = new StandbyClient(observer.getID(), secure, readTimeoutMs)) { - client.connect(host, port); - - long sizeBefore = fileStore.getStats().getApproximateSize(); - new StandbySyncExecution(fileStore, client, newRunningSupplier()).execute(); - long sizeAfter = fileStore.getStats().getApproximateSize(); - - if (autoClean && sizeBefore > 0) { - // if size gain is over 25% call cleanup - if (sizeAfter - sizeBefore > 0.25 * sizeBefore) { - log.info("Store size increased from {} to {}, will run cleanup.", humanReadableByteCount(sizeBefore), humanReadableByteCount(sizeAfter)); - fileStore.cleanup(); - } - } - } - this.failedRequests = 0; - this.syncStartTimestamp = startTimestamp; - this.syncEndTimestamp = System.currentTimeMillis(); - this.lastSuccessfulRequest = syncEndTimestamp / 1000; - } catch (Exception e) { - this.failedRequests++; - log.error("Failed synchronizing state.", e); - } finally { - synchronized (this.sync) { - this.active = false; - } - } - } - - private Supplier newRunningSupplier() { - return new Supplier() { - - @Override - public Boolean get() { - return running.get(); - } - - }; - } - - @Override - public String getMode() { - return "client: " + this.observer.getID(); - } - - @Override - public boolean isRunning() { - return running.get(); - } - - @Override - public void start() { - running.set(true); - state = STATUS_RUNNING; - } - - @Override - public void stop() { - running.set(false); - state = STATUS_STOPPED; - } - - @Override - public String getStatus() { - return this.state; - } - - @Override - public int getFailedRequests() { - return this.failedRequests; - } - - @Override - public int getSecondsSinceLastSuccess() { - if (this.lastSuccessfulRequest < 0) { - return -1; - } - return (int) (System.currentTimeMillis() / 1000 - this.lastSuccessfulRequest); - } - - @Override - public int calcFailedRequests() { - return this.getFailedRequests(); - } - - @Override - public int calcSecondsSinceLastSuccess() { - return this.getSecondsSinceLastSuccess(); - } - - @Override - public void cleanup() { - try { - fileStore.cleanup(); - } catch (IOException e) { - log.error("Error while cleaning up", e); - } - } - - @Override - public long getSyncStartTimestamp() { - return syncStartTimestamp; - } - - @Override - public long getSyncEndTimestamp() { - return syncEndTimestamp; - } - -} diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbySyncExecution.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbySyncExecution.java deleted file mode 100644 index 2672b16..0000000 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbySyncExecution.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.jackrabbit.oak.segment.standby.client; - -import static com.google.common.collect.Maps.newHashMap; -import static com.google.common.collect.Sets.newHashSet; - -import java.util.LinkedList; -import java.util.Map; -import java.util.Set; -import java.util.UUID; - -import javax.annotation.Nonnull; - -import com.google.common.base.Supplier; -import org.apache.jackrabbit.oak.segment.RecordId; -import org.apache.jackrabbit.oak.segment.Segment; -import org.apache.jackrabbit.oak.segment.SegmentId; -import org.apache.jackrabbit.oak.segment.SegmentNodeBuilder; -import org.apache.jackrabbit.oak.segment.SegmentNodeState; -import org.apache.jackrabbit.oak.segment.SegmentNotFoundException; -import org.apache.jackrabbit.oak.segment.file.FileStore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Encapsulates the algorithm for a single execution of the synchronization - * process between the primary and the standby instance. It also contains - * temporary state that is supposed to be used for the lifetime of a - * synchronization run. - */ -class StandbySyncExecution { - - private static final Logger log = LoggerFactory.getLogger(StandbySyncExecution.class); - - private final FileStore store; - - private final StandbyClient client; - - private final Supplier running; - - private final Set visited = newHashSet(); - - private final Map cache = newHashMap(); - - StandbySyncExecution(FileStore store, StandbyClient client, Supplier running) { - this.store = store; - this.client = client; - this.running = running; - } - - void execute() throws Exception { - RecordId remoteHead = getHead(); - - if (remoteHead.equals(store.getHead().getRecordId())) { - return; - } - - long t = System.currentTimeMillis(); - SegmentNodeState before = store.getHead(); - SegmentNodeBuilder builder = before.builder(); - SegmentNodeState current = newSegmentNodeState(remoteHead); - compareAgainstBaseState(current, before, builder); - boolean ok = setHead(before, builder.getNodeState()); - log.debug("updated head state successfully: {} in {}ms.", ok, System.currentTimeMillis() - t); - } - - private RecordId getHead() throws Exception { - return RecordId.fromString(store, client.getHead()); - } - - private SegmentNodeState newSegmentNodeState(RecordId id) { - return store.getReader().readNode(id); - } - - private boolean setHead(@Nonnull SegmentNodeState expected, @Nonnull SegmentNodeState head) { - return store.getRevisions().setHead(expected.getRecordId(), head.getRecordId()); - } - - private boolean compareAgainstBaseState(SegmentNodeState current, SegmentNodeState before, SegmentNodeBuilder builder) throws Exception { - while (true) { - try { - return current.compareAgainstBaseState(before, new StandbyDiff(builder, store, client, running)); - } catch (SegmentNotFoundException e) { - log.info("Found missing segment {}", e.getSegmentId()); - copySegmentHierarchyFromPrimary(UUID.fromString(e.getSegmentId())); - } - } - } - - private void copySegmentHierarchyFromPrimary(UUID segmentId) throws Exception { - LinkedList batch = new LinkedList<>(); - - batch.offer(segmentId); - - while (batch.size() > 0) { - UUID current = batch.remove(); - - log.info("Loading segment {}", current); - Segment segment = copySegmentFromPrimary(current); - - log.info("Marking segment {} as loaded", current); - visited.add(current); - - if (!SegmentId.isDataSegmentId(current.getLeastSignificantBits())) { - continue; - } - - log.info("Inspecting segment {} for references", current); - for (int i = 0; i < segment.getReferencedSegmentIdCount(); i++) { - UUID referenced = segment.getReferencedSegmentId(i); - - if (visited.contains(referenced)) { - continue; - } - - log.info("Found reference from {} to {}", current, referenced); - if (SegmentId.isDataSegmentId(referenced.getLeastSignificantBits())) { - batch.add(referenced); - } else { - batch.addFirst(referenced); - } - } - } - } - - private Segment copySegmentFromPrimary(UUID uuid) throws Exception { - Segment result = cache.get(uuid); - - if (result != null) { - log.info("Segment {} was found in the local cache", uuid); - return result; - } - - byte[] data = client.getSegment(uuid.toString()); - - if (data == null) { - throw new IllegalStateException("Unable to read segment " + uuid); - } - - long msb = uuid.getMostSignificantBits(); - long lsb = uuid.getLeastSignificantBits(); - SegmentId segmentId = store.newSegmentId(msb, lsb); - store.writeSegment(segmentId, data, 0, data.length); - result = segmentId.getSegment(); - cache.put(uuid, result); - return result; - } - -} - diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java index 08f4b79..96dc0f0 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java @@ -19,17 +19,19 @@ package org.apache.jackrabbit.oak.segment.standby.server; -import java.io.Closeable; -import java.lang.management.ManagementFactory; import java.security.cert.CertificateException; import java.util.concurrent.TimeUnit; -import javax.management.InstanceNotFoundException; -import javax.management.MBeanServer; -import javax.management.ObjectName; -import javax.management.StandardMBean; import javax.net.ssl.SSLException; +import org.apache.jackrabbit.oak.segment.standby.codec.GetBlobResponseEncoder; +import org.apache.jackrabbit.oak.segment.standby.codec.GetHeadResponseEncoder; +import org.apache.jackrabbit.oak.segment.standby.codec.GetSegmentResponseEncoder; +import org.apache.jackrabbit.oak.segment.standby.codec.RequestDecoder; +import org.apache.jackrabbit.oak.segment.standby.store.CommunicationObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; @@ -43,67 +45,76 @@ import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.compression.SnappyFramedEncoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.SelfSignedCertificate; import io.netty.util.CharsetUtil; -import org.apache.jackrabbit.oak.segment.file.FileStore; -import org.apache.jackrabbit.oak.segment.standby.codec.GetBlobResponseEncoder; -import org.apache.jackrabbit.oak.segment.standby.codec.GetHeadResponseEncoder; -import org.apache.jackrabbit.oak.segment.standby.codec.GetSegmentResponseEncoder; -import org.apache.jackrabbit.oak.segment.standby.codec.RequestDecoder; -import org.apache.jackrabbit.oak.segment.standby.jmx.StandbyStatusMBean; -import org.apache.jackrabbit.oak.segment.standby.store.CommunicationObserver; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -public class StandbyServer implements StandbyStatusMBean, Closeable { +public class StandbyServer implements AutoCloseable { - private static final Logger log = LoggerFactory - .getLogger(StandbyServer.class); + private static final Logger log = LoggerFactory.getLogger(StandbyServer.class); private final int port; private final EventLoopGroup bossGroup; private final EventLoopGroup workerGroup; private final ServerBootstrap b; - private final CommunicationObserver observer; + private SslContext sslContext; private ChannelFuture channelFuture; - private boolean running; - private volatile String state; - - public StandbyServer(int port, final FileStore store) throws CertificateException, SSLException { - this(port, store, null, false); - } + public static class Builder { + private final int port; + private final StoreProvider storeProvider; - public StandbyServer(int port, final FileStore store, boolean secure) throws CertificateException, SSLException { - this(port, store, null, secure); - } + private boolean secure; + private String[] allowedClientIPRanges; + private StateConsumer stateConsumer; + private CommunicationObserver observer; - public StandbyServer(int port, final FileStore store, String[] allowedClientIPRanges) throws CertificateException, SSLException { - this(port, store, allowedClientIPRanges, false); + public Builder(final int port, final StoreProvider storeProvider) { + this.port = port; + this.storeProvider = storeProvider; + } + + public Builder secure(boolean secure) { + this.secure = secure; + return this; + } + + public Builder allowIPRanges(String[] alloallowedClientIPRanges) { + this.allowedClientIPRanges = alloallowedClientIPRanges; + + return this; + } + + public Builder withStateConsumer(StateConsumer stateConsumer) { + this.stateConsumer = stateConsumer; + + return this; + } + + public Builder withObserver(CommunicationObserver observer) { + this.observer = observer; + + return this; + } + + public StandbyServer build() throws CertificateException, SSLException { + return new StandbyServer(this); + } } - public StandbyServer(int port, final FileStore store, final String[] allowedClientIPRanges, boolean secure) throws CertificateException, SSLException { - this.port = port; + private StandbyServer(final Builder builder) throws CertificateException, SSLException { + this.port = builder.port; - if (secure) { + if (builder.secure) { SelfSignedCertificate ssc = new SelfSignedCertificate(); - sslContext = SslContext.newServerContext(ssc.certificate(), ssc.privateKey()); + sslContext = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } - observer = new CommunicationObserver("primary"); bossGroup = new NioEventLoopGroup(1); workerGroup = new NioEventLoopGroup(); - final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer(); - try { - jmxServer.registerMBean(new StandardMBean(this, StandbyStatusMBean.class), new ObjectName(this.getMBeanName())); - } - catch (Exception e) { - log.error("can't register standby status mbean", e); - } - b = new ServerBootstrap(); b.group(bossGroup, workerGroup); b.channel(NioServerSocketChannel.class); @@ -119,10 +130,10 @@ public class StandbyServer implements StandbyStatusMBean, Closeable { public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); - p.addLast(new ClientFilterHandler(new ClientIpFilter(allowedClientIPRanges))); + p.addLast(new ClientFilterHandler(new ClientIpFilter(builder.allowedClientIPRanges))); if (sslContext != null) { - p.addLast(sslContext.newHandler(ch.alloc())); + p.addLast("ssl", sslContext.newHandler(ch.alloc())); } // Decoders @@ -130,8 +141,8 @@ public class StandbyServer implements StandbyStatusMBean, Closeable { p.addLast(new LineBasedFrameDecoder(8192)); p.addLast(new StringDecoder(CharsetUtil.UTF_8)); p.addLast(new RequestDecoder()); - p.addLast(new StateHandler(newStateConsumer())); - p.addLast(new RequestObserverHandler(observer)); + p.addLast(new StateHandler(builder.stateConsumer)); + p.addLast(new RequestObserverHandler(builder.observer)); // Encoders @@ -139,120 +150,55 @@ public class StandbyServer implements StandbyStatusMBean, Closeable { p.addLast(new GetHeadResponseEncoder()); p.addLast(new GetSegmentResponseEncoder()); p.addLast(new GetBlobResponseEncoder()); - p.addLast(new ResponseObserverHandler(observer)); + p.addLast(new ResponseObserverHandler(builder.observer)); // Handlers - p.addLast(new GetHeadRequestHandler(new DefaultStandbyHeadReader(store))); - p.addLast(new GetSegmentRequestHandler(new DefaultStandbySegmentReader(store))); - p.addLast(new GetBlobRequestHandler(new DefaultStandbyBlobReader(store))); + p.addLast(new GetHeadRequestHandler(new DefaultStandbyHeadReader(builder.storeProvider.provideStore()))); + p.addLast(new GetSegmentRequestHandler(new DefaultStandbySegmentReader(builder.storeProvider.provideStore()))); + p.addLast(new GetBlobRequestHandler(new DefaultStandbyBlobReader(builder.storeProvider.provideStore()))); } }); } - private StateConsumer newStateConsumer() { - return new StateConsumer() { - - @Override - public void consumeState(String state) { - StandbyServer.this.state = state; - } + public void start() { + channelFuture = b.bind(port); - }; + if (channelFuture.awaitUninterruptibly(10, TimeUnit.SECONDS)) { + onTimelyConnect(); + } else { + onConnectTimeOut(); + } } - - public String getMBeanName() { - return StandbyStatusMBean.JMX_NAME + ",id=" + this.port; + + public void stop() { + channelFuture.channel().disconnect(); } + @Override public void close() { stop(); - state = STATUS_CLOSING; - observer.unregister(); - final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer(); - try { - jmxServer.unregisterMBean(new ObjectName(this.getMBeanName())); - } catch (InstanceNotFoundException e) { - // ignore - } catch (Exception e) { - log.error("can't unregister standby status mbean", e); - } + if (bossGroup != null && !bossGroup.isShuttingDown()) { bossGroup.shutdownGracefully(0, 1, TimeUnit.SECONDS).syncUninterruptibly(); } if (workerGroup != null && !workerGroup.isShuttingDown()) { workerGroup.shutdownGracefully(0, 1, TimeUnit.SECONDS).syncUninterruptibly(); } - state = STATUS_CLOSED; - } - - @Override - public void start() { - if (running) { - return; - } - - state = STATUS_STARTING; - - channelFuture = b.bind(port); - - if (channelFuture.awaitUninterruptibly(10, TimeUnit.SECONDS)) { - onTimelyStart(); - } else { - onStartTimeOut(); - } } - private void onTimelyStart() { + private void onTimelyConnect() { if (channelFuture.isSuccess()) { - onSuccessfulStart(); + log.debug("Binding was successful"); } if (channelFuture.cause() != null) { - onUnsuccessfulStart(); + throw new RuntimeException(channelFuture.cause()); } } - private void onSuccessfulStart() { - log.debug("Binding was successful"); - state = STATUS_RUNNING; - running = true; - } - - private void onUnsuccessfulStart() { - log.debug("Binding was unsuccessful", channelFuture.cause()); - state = null; - running = false; - throw new RuntimeException(channelFuture.cause()); - } - - private void onStartTimeOut() { + private void onConnectTimeOut() { log.debug("Binding timed out, canceling"); - state = null; - running = false; channelFuture.cancel(true); } - - @Override - public String getMode() { - return "primary"; - } - - @Override - public boolean isRunning() { return running; } - - @Override - public void stop() { - if (running) { - running = false; - this.state = STATUS_STOPPED; - channelFuture.channel().disconnect(); - } - } - - @Override - public String getStatus() { - return state == null ? STATUS_INITIALIZING : state; - } - } diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServerSync.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServerSync.java new file mode 100644 index 0000000..147f129 --- /dev/null +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServerSync.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.segment.standby.server; + +import java.io.Closeable; +import java.lang.management.ManagementFactory; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.management.InstanceNotFoundException; +import javax.management.MBeanServer; +import javax.management.ObjectName; +import javax.management.StandardMBean; + +import org.apache.jackrabbit.oak.segment.file.FileStore; +import org.apache.jackrabbit.oak.segment.standby.jmx.StandbyStatusMBean; +import org.apache.jackrabbit.oak.segment.standby.store.CommunicationObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StandbyServerSync implements StandbyStatusMBean, StateConsumer, StoreProvider, Closeable { + private static final Logger log = LoggerFactory.getLogger(StandbyServer.class); + + private final FileStore fileStore; + private final CommunicationObserver observer; + private final int port; + private final String[] allowedClientIPRanges; + private final boolean secure; + + private volatile String state; + private final AtomicBoolean running = new AtomicBoolean(false); + private StandbyServer server; + + public StandbyServerSync(final int port, final FileStore fileStore) { + this(port, fileStore, null, false); + } + + public StandbyServerSync(final int port, final FileStore fileStore, final boolean secure) { + this(port, fileStore, null, secure); + } + + public StandbyServerSync(final int port, final FileStore fileStore, final String[] allowedClientIPRanges) { + this(port, fileStore, allowedClientIPRanges, false); + } + + public StandbyServerSync(final int port, final FileStore fileStore, final String[] allowedClientIPRanges, + final boolean secure) { + this.port = port; + this.fileStore = fileStore; + this.allowedClientIPRanges = allowedClientIPRanges; + this.secure = secure; + this.observer = new CommunicationObserver("primary"); + + final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer(); + try { + jmxServer.registerMBean(new StandardMBean(this, StandbyStatusMBean.class), + new ObjectName(this.getMBeanName())); + } catch (Exception e) { + log.error("can't register standby status mbean", e); + } + } + + @Override + public void consumeState(String state) { + this.state = state; + } + + @Override + public FileStore provideStore() { + return fileStore; + } + + @Override + public void start() { + if (isRunning()) { + return; + } + + state = STATUS_STARTING; + + try { + server = new StandbyServer.Builder(port, this) + .secure(secure) + .allowIPRanges(allowedClientIPRanges) + .withStateConsumer(this) + .withObserver(observer) + .build(); + server.start(); + + state = STATUS_RUNNING; + running.set(true); + } catch (Exception e) { + log.error("Server could not be started.", e); + state = null; + running.set(false); + } + } + + @Override + public void stop() { + if (server != null) { + server.stop(); + } + + running.set(false); + state = STATUS_STOPPED; + } + + @Override + public void close() { + stop(); + state = STATUS_CLOSING; + + observer.unregister(); + final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer(); + try { + jmxServer.unregisterMBean(new ObjectName(this.getMBeanName())); + } catch (InstanceNotFoundException e) { + // ignore + } catch (Exception e) { + log.error("can't unregister standby status mbean", e); + } + + state = STATUS_CLOSED; + } + + @Override + public String getMode() { + return "primary"; + } + + @Override + public String getStatus() { + return state == null ? STATUS_INITIALIZING : state; + } + + @Override + public boolean isRunning() { + return running.get(); + } + + public String getMBeanName() { + return StandbyStatusMBean.JMX_NAME + ",id=" + this.port; + } +} diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StoreProvider.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StoreProvider.java new file mode 100644 index 0000000..f20f78e --- /dev/null +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StoreProvider.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.segment.standby.server; + +import org.apache.jackrabbit.oak.segment.file.FileStore; + +interface StoreProvider { + FileStore provideStore(); +} diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/StandbyStoreService.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/StandbyStoreService.java index 8dc78ec..d73a1ff 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/StandbyStoreService.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/StandbyStoreService.java @@ -38,8 +38,8 @@ import org.apache.jackrabbit.oak.commons.PropertiesUtil; import org.apache.jackrabbit.oak.segment.SegmentStore; import org.apache.jackrabbit.oak.segment.SegmentStoreProvider; import org.apache.jackrabbit.oak.segment.file.FileStore; -import org.apache.jackrabbit.oak.segment.standby.client.StandbySync; -import org.apache.jackrabbit.oak.segment.standby.server.StandbyServer; +import org.apache.jackrabbit.oak.segment.standby.client.StandbyClientSync; +import org.apache.jackrabbit.oak.segment.standby.server.StandbyServerSync; import org.osgi.framework.ServiceRegistration; import org.osgi.service.component.ComponentContext; import org.slf4j.Logger; @@ -94,9 +94,9 @@ public class StandbyStoreService { private FileStore fileStore; - private StandbyServer primary = null; + private StandbyServerSync serverSync = null; - private StandbySync sync = null; + private StandbyClientSync clientSync = null; private ServiceRegistration syncReg = null; @@ -128,12 +128,14 @@ public class StandbyStoreService { @Deactivate public synchronized void deactivate() { - if (primary != null) { - primary.close(); + if (serverSync != null) { + serverSync.close(); } - if (sync != null) { - sync.close(); + + if (clientSync != null) { + clientSync.close(); } + if (syncReg != null) { syncReg.unregister(); } @@ -144,8 +146,8 @@ public class StandbyStoreService { int port = PropertiesUtil.toInteger(props.get(PORT), PORT_DEFAULT); String[] ranges = PropertiesUtil.toStringArray(props.get(ALLOWED_CLIENT_IP_RANGES), ALLOWED_CLIENT_IP_RANGES_DEFAULT); boolean secure = PropertiesUtil.toBoolean(props.get(SECURE), SECURE_DEFAULT); - primary = new StandbyServer(port, fileStore, ranges, secure); - primary.start(); + serverSync = new StandbyServerSync(port, fileStore, ranges, secure); + serverSync.start(); log.info("started primary on port {} with allowed ip ranges {}.", port, ranges); } @@ -158,14 +160,14 @@ public class StandbyStoreService { int readTimeout = PropertiesUtil.toInteger(props.get(READ_TIMEOUT), READ_TIMEOUT_DEFAULT); boolean clean = PropertiesUtil.toBoolean(props.get(AUTO_CLEAN), AUTO_CLEAN_DEFAULT); - sync = new StandbySync(host, port, fileStore, secure, readTimeout, clean); + clientSync = new StandbyClientSync(host, port, fileStore, secure, readTimeout, clean); Dictionary dictionary = new Hashtable(); dictionary.put("scheduler.period", interval); dictionary.put("scheduler.concurrent", false); // dictionary.put("scheduler.runOn", "SINGLE"); syncReg = context.getBundleContext().registerService( - Runnable.class.getName(), sync, dictionary); + Runnable.class.getName(), clientSync, dictionary); log.info("started standby sync with {}:{} at {} sec.", host, port, interval); } diff --git oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/BrokenNetworkTest.java oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/BrokenNetworkTest.java index 8295ff6..72e97c7 100644 --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/BrokenNetworkTest.java +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/BrokenNetworkTest.java @@ -25,8 +25,8 @@ import static org.junit.Assert.assertFalse; import org.apache.jackrabbit.oak.segment.NetworkErrorProxy; import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders; -import org.apache.jackrabbit.oak.segment.standby.client.StandbySync; -import org.apache.jackrabbit.oak.segment.standby.server.StandbyServer; +import org.apache.jackrabbit.oak.segment.standby.client.StandbyClientSync; +import org.apache.jackrabbit.oak.segment.standby.server.StandbyServerSync; import org.apache.jackrabbit.oak.spi.state.NodeStore; import org.junit.After; import org.junit.Before; @@ -121,13 +121,13 @@ public class BrokenNetworkTest extends TestBase { p.run(); NodeStore store = SegmentNodeStoreBuilders.builder(storeS).build(); - final StandbyServer server = new StandbyServer(port, storeS, ssl); - server.start(); + final StandbyServerSync serverSync = new StandbyServerSync(port, storeS, ssl); + serverSync.start(); addTestContent(store, "server"); storeS.flush(); // this speeds up the test a little bit... - StandbySync cl = newStandbySync(storeC, proxyPort, ssl); - cl.run(); + StandbyClientSync clientSync = newStandbyClientSync(storeC, proxyPort, ssl); + clientSync.run(); try { if (skipBytes > 0 || flipPosition >= 0) { @@ -139,12 +139,12 @@ public class BrokenNetworkTest extends TestBase { addTestContent(store, "server2"); storeS.flush(); } - cl.run(); + clientSync.run(); } assertEquals(storeS.getHead(), storeC.getHead()); } finally { - server.close(); - cl.close(); + serverSync.close(); + clientSync.close(); p.close(); } } diff --git oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java index 94e91d7..e1f0819 100644 --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java @@ -31,7 +31,6 @@ import java.io.IOException; import java.util.Random; import java.util.concurrent.ScheduledExecutorService; -import com.google.common.io.ByteStreams; import org.apache.jackrabbit.core.data.FileDataStore; import org.apache.jackrabbit.oak.api.Blob; import org.apache.jackrabbit.oak.api.CommitFailedException; @@ -42,8 +41,8 @@ import org.apache.jackrabbit.oak.segment.NetworkErrorProxy; import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders; import org.apache.jackrabbit.oak.segment.file.FileStore; import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder; -import org.apache.jackrabbit.oak.segment.standby.client.StandbySync; -import org.apache.jackrabbit.oak.segment.standby.server.StandbyServer; +import org.apache.jackrabbit.oak.segment.standby.client.StandbyClientSync; +import org.apache.jackrabbit.oak.segment.standby.server.StandbyServerSync; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; import org.apache.jackrabbit.oak.spi.commit.EmptyHook; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; @@ -53,6 +52,8 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import com.google.common.io.ByteStreams; + public class DataStoreTestBase extends TestBase { protected boolean storesCanBeEqual = false; @@ -107,18 +108,18 @@ public class DataStoreTestBase extends TestBase { FileStore secondary = getSecondary(); NodeStore store = SegmentNodeStoreBuilders.builder(primary).build(); - final StandbyServer server = new StandbyServer(port, primary); - server.start(); + final StandbyServerSync serverSync = new StandbyServerSync(port, primary); + serverSync.start(); byte[] data = addTestContent(store, "server", blobSize); primary.flush(); - StandbySync cl = newStandbySync(secondary); + StandbyClientSync cl = newStandbyClientSync(secondary); cl.run(); try { assertEquals(primary.getHead(), secondary.getHead()); } finally { - server.close(); + serverSync.close(); cl.close(); } @@ -183,13 +184,13 @@ public class DataStoreTestBase extends TestBase { p.run(); NodeStore store = SegmentNodeStoreBuilders.builder(primary).build(); - final StandbyServer server = new StandbyServer(port, primary); - server.start(); + final StandbyServerSync serverSync = new StandbyServerSync(port, primary); + serverSync.start(); byte[] data = addTestContent(store, "server", blobSize); primary.flush(); - StandbySync cl = newStandbySync(secondary, proxyPort); - cl.run(); + StandbyClientSync clientSync = newStandbyClientSync(secondary, proxyPort); + clientSync.run(); try { if (skipBytes > 0 || flipPosition >= 0) { @@ -202,12 +203,12 @@ public class DataStoreTestBase extends TestBase { data = addTestContent(store, "server", blobSize); primary.flush(); } - cl.run(); + clientSync.run(); } assertEquals(primary.getHead(), secondary.getHead()); } finally { - server.close(); - cl.close(); + serverSync.close(); + clientSync.close(); p.close(); } diff --git oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/FailoverIPRangeTest.java oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/FailoverIPRangeTest.java index a9de142..2a9f5c8 100644 --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/FailoverIPRangeTest.java +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/FailoverIPRangeTest.java @@ -24,8 +24,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders; -import org.apache.jackrabbit.oak.segment.standby.client.StandbySync; -import org.apache.jackrabbit.oak.segment.standby.server.StandbyServer; +import org.apache.jackrabbit.oak.segment.standby.client.StandbyClientSync; +import org.apache.jackrabbit.oak.segment.standby.server.StandbyServerSync; import org.apache.jackrabbit.oak.spi.state.NodeStore; import org.junit.After; import org.junit.Before; @@ -141,13 +141,13 @@ public class FailoverIPRangeTest extends TestBase { private void createTestWithConfig(String host, String[] ipRanges, boolean expectedToWork) throws Exception { NodeStore store = SegmentNodeStoreBuilders.builder(storeS).build(); - final StandbyServer server = new StandbyServer(port, storeS, ipRanges); - server.start(); + final StandbyServerSync serverSync = new StandbyServerSync(port, storeS, ipRanges); + serverSync.start(); addTestContent(store, "server"); storeS.flush(); // this speeds up the test a little bit... - StandbySync cl = new StandbySync(host, port, storeC, false, timeout, false); - cl.run(); + StandbyClientSync clientSync = new StandbyClientSync(host, port, storeC, false, timeout, false); + clientSync.run(); try { if (expectedToWork) { @@ -157,8 +157,8 @@ public class FailoverIPRangeTest extends TestBase { assertFalse("stores are equal but shouldn't!", storeS.getHead().equals(storeC.getHead())); } } finally { - server.close(); - cl.close(); + serverSync.close(); + clientSync.close(); } } diff --git oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/FailoverMultipleClientsTestIT.java oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/FailoverMultipleClientsTestIT.java index 58bbeaa..af02aac 100644 --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/FailoverMultipleClientsTestIT.java +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/FailoverMultipleClientsTestIT.java @@ -23,8 +23,8 @@ import static org.junit.Assert.assertFalse; import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders; import org.apache.jackrabbit.oak.segment.SegmentTestUtils; -import org.apache.jackrabbit.oak.segment.standby.client.StandbySync; -import org.apache.jackrabbit.oak.segment.standby.server.StandbyServer; +import org.apache.jackrabbit.oak.segment.standby.client.StandbyClientSync; +import org.apache.jackrabbit.oak.segment.standby.server.StandbyServerSync; import org.apache.jackrabbit.oak.spi.state.NodeStore; import org.junit.After; import org.junit.Before; @@ -45,13 +45,13 @@ public class FailoverMultipleClientsTestIT extends TestBase { @Test public void testMultipleClients() throws Exception { NodeStore store = SegmentNodeStoreBuilders.builder(storeS).build(); - final StandbyServer server = new StandbyServer(port, storeS); - server.start(); + final StandbyServerSync serverSync = new StandbyServerSync(port, storeS); + serverSync.start(); SegmentTestUtils.addTestContent(store, "server"); storeS.flush(); // this speeds up the test a little bit... - StandbySync cl1 = newStandbySync(storeC); - StandbySync cl2 = newStandbySync(storeC2); + StandbyClientSync cl1 = newStandbyClientSync(storeC); + StandbyClientSync cl2 = newStandbyClientSync(storeC2); try { assertFalse("first client has invalid initial store!", storeS.getHead().equals(storeC.getHead())); @@ -77,7 +77,7 @@ public class FailoverMultipleClientsTestIT extends TestBase { cl1.run(); assertEquals(storeS.getHead(), storeC.getHead()); } finally { - server.close(); + serverSync.close(); cl1.close(); cl2.close(); } diff --git oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/FailoverSslTestIT.java oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/FailoverSslTestIT.java index 43b9f78..3e78e1b 100644 --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/FailoverSslTestIT.java +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/FailoverSslTestIT.java @@ -24,8 +24,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders; -import org.apache.jackrabbit.oak.segment.standby.client.StandbySync; -import org.apache.jackrabbit.oak.segment.standby.server.StandbyServer; +import org.apache.jackrabbit.oak.segment.standby.client.StandbyClientSync; +import org.apache.jackrabbit.oak.segment.standby.server.StandbyServerSync; import org.apache.jackrabbit.oak.spi.state.NodeStore; import org.junit.After; import org.junit.Before; @@ -47,19 +47,19 @@ public class FailoverSslTestIT extends TestBase { public void testFailoverSecure() throws Exception { NodeStore store = SegmentNodeStoreBuilders.builder(storeS).build(); - final StandbyServer server = new StandbyServer(port, storeS, true); - server.start(); + final StandbyServerSync serverSync = new StandbyServerSync(port, storeS, true); + serverSync.start(); addTestContent(store, "server"); storeS.flush(); // this speeds up the test a little bit... - StandbySync cl = newStandbySync(storeC, port, true); - cl.run(); + StandbyClientSync clientSync = newStandbyClientSync(storeC, port, true); + clientSync.run(); try { assertEquals(storeS.getHead(), storeC.getHead()); } finally { - server.close(); - cl.close(); + serverSync.close(); + clientSync.close(); } } @@ -67,19 +67,19 @@ public class FailoverSslTestIT extends TestBase { public void testFailoverSecureServerPlainClient() throws Exception { NodeStore store = SegmentNodeStoreBuilders.builder(storeS).build(); - final StandbyServer server = new StandbyServer(port, storeS, true); - server.start(); + final StandbyServerSync serverSync = new StandbyServerSync(port, storeS, true); + serverSync.start(); addTestContent(store, "server"); storeS.flush(); // this speeds up the test a little bit... - StandbySync cl = newStandbySync(storeC); - cl.run(); + StandbyClientSync clientSync = newStandbyClientSync(storeC); + clientSync.run(); try { assertFalse("stores are equal but shouldn't!", storeS.getHead().equals(storeC.getHead())); } finally { - server.close(); - cl.close(); + serverSync.close(); + clientSync.close(); } } @@ -87,19 +87,19 @@ public class FailoverSslTestIT extends TestBase { public void testFailoverPlainServerSecureClient() throws Exception { NodeStore store = SegmentNodeStoreBuilders.builder(storeS).build(); - final StandbyServer server = new StandbyServer(port, storeS); - server.start(); + final StandbyServerSync serverSync = new StandbyServerSync(port, storeS); + serverSync.start(); addTestContent(store, "server"); storeS.flush(); // this speeds up the test a little bit... - StandbySync cl = newStandbySync(storeC, port, true); - cl.run(); + StandbyClientSync clientSync = newStandbyClientSync(storeC, port, true); + clientSync.run(); try { assertFalse("stores are equal but shouldn't!", storeS.getHead().equals(storeC.getHead())); } finally { - server.close(); - cl.close(); + serverSync.close(); + clientSync.close(); } } } diff --git oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/MBeanTest.java oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/MBeanTest.java index d2a098b..52400f5 100644 --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/MBeanTest.java +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/MBeanTest.java @@ -30,9 +30,9 @@ import java.util.Set; import javax.management.MBeanServer; import javax.management.ObjectName; -import org.apache.jackrabbit.oak.segment.standby.client.StandbySync; +import org.apache.jackrabbit.oak.segment.standby.client.StandbyClientSync; import org.apache.jackrabbit.oak.segment.standby.jmx.StandbyStatusMBean; -import org.apache.jackrabbit.oak.segment.standby.server.StandbyServer; +import org.apache.jackrabbit.oak.segment.standby.server.StandbyServerSync; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -51,8 +51,8 @@ public class MBeanTest extends TestBase { @Test public void testServerEmptyConfig() throws Exception { - final StandbyServer server = new StandbyServer(TestBase.port, this.storeS); - server.start(); + final StandbyServerSync serverSync = new StandbyServerSync(TestBase.port, this.storeS); + serverSync.start(); final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer(); ObjectName status = new ObjectName(StandbyStatusMBean.JMX_NAME + ",id=*"); @@ -60,7 +60,7 @@ public class MBeanTest extends TestBase { Set instances = jmxServer.queryNames(status, null); assertEquals(1, instances.size()); status = instances.toArray(new ObjectName[0])[0]; - assertEquals(new ObjectName(server.getMBeanName()), status); + assertEquals(new ObjectName(serverSync.getMBeanName()), status); assertTrue(jmxServer.isRegistered(status)); assertEquals("primary", jmxServer.getAttribute(status, "Mode")); @@ -78,7 +78,7 @@ public class MBeanTest extends TestBase { assertEquals(true, jmxServer.getAttribute(status, "Running")); assertEquals(StandbyStatusMBean.STATUS_RUNNING, jmxServer.getAttribute(status, "Status")); } finally { - server.close(); + serverSync.close(); } assertTrue(!jmxServer.isRegistered(status)); @@ -86,9 +86,9 @@ public class MBeanTest extends TestBase { @Test public void testClientEmptyConfigNoServer() throws Exception { - final StandbySync client = newStandbySync(storeC); - client.start(); - client.run(); + final StandbyClientSync clientSync = newStandbyClientSync(storeC); + clientSync.start(); + clientSync.run(); final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer(); ObjectName status = new ObjectName(StandbyStatusMBean.JMX_NAME + ",id=*"); @@ -96,7 +96,7 @@ public class MBeanTest extends TestBase { Set instances = jmxServer.queryNames(status, null); assertEquals(1, instances.size()); status = instances.toArray(new ObjectName[0])[0]; - assertEquals(new ObjectName(client.getMBeanName()), status); + assertEquals(new ObjectName(clientSync.getMBeanName()), status); assertTrue(jmxServer.isRegistered(status)); String m = jmxServer.getAttribute(status, "Mode").toString(); @@ -115,7 +115,7 @@ public class MBeanTest extends TestBase { assertEquals(true, jmxServer.getAttribute(status, "Running")); assertEquals(StandbyStatusMBean.STATUS_RUNNING, jmxServer.getAttribute(status, "Status")); } finally { - client.close(); + clientSync.close(); } assertTrue(!jmxServer.isRegistered(status)); @@ -123,13 +123,13 @@ public class MBeanTest extends TestBase { @Test public void testClientNoServer() throws Exception { - System.setProperty(StandbySync.CLIENT_ID_PROPERTY_NAME, "Foo"); - final StandbySync client = newStandbySync(storeC); - client.start(); - client.run(); + System.setProperty(StandbyClientSync.CLIENT_ID_PROPERTY_NAME, "Foo"); + final StandbyClientSync clientSync = newStandbyClientSync(storeC); + clientSync.start(); + clientSync.run(); final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer(); - ObjectName status = new ObjectName(client.getMBeanName()); + ObjectName status = new ObjectName(clientSync.getMBeanName()); try { assertTrue(jmxServer.isRegistered(status)); assertEquals("client: Foo", jmxServer.getAttribute(status, "Mode")); @@ -140,7 +140,7 @@ public class MBeanTest extends TestBase { assertEquals("1", jmxServer.invoke(status, "calcFailedRequests", null, null).toString()); assertEquals("-1", jmxServer.invoke(status, "calcSecondsSinceLastSuccess", null, null).toString()); } finally { - client.close(); + clientSync.close(); } assertTrue(!jmxServer.isRegistered(status)); @@ -148,18 +148,18 @@ public class MBeanTest extends TestBase { @Test public void testClientAndServerEmptyConfig() throws Exception { - final StandbyServer server = new StandbyServer(port, this.storeS); - server.start(); + final StandbyServerSync serverSync = new StandbyServerSync(port, this.storeS); + serverSync.start(); - System.setProperty(StandbySync.CLIENT_ID_PROPERTY_NAME, "Bar"); - final StandbySync client = newStandbySync(storeC); - client.start(); - client.run(); + System.setProperty(StandbyClientSync.CLIENT_ID_PROPERTY_NAME, "Bar"); + final StandbyClientSync clientSync = newStandbyClientSync(storeC); + clientSync.start(); + clientSync.run(); final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer(); ObjectName status = new ObjectName(StandbyStatusMBean.JMX_NAME + ",id=*"); - ObjectName clientStatus = new ObjectName(client.getMBeanName()); - ObjectName serverStatus = new ObjectName(server.getMBeanName()); + ObjectName clientStatus = new ObjectName(clientSync.getMBeanName()); + ObjectName serverStatus = new ObjectName(serverSync.getMBeanName()); try { Set instances = jmxServer.queryNames(status, null); assertEquals(3, instances.size()); @@ -224,8 +224,8 @@ public class MBeanTest extends TestBase { assertEquals(StandbyStatusMBean.STATUS_RUNNING, jmxServer.getAttribute(clientStatus, "Status")); } finally { - client.close(); - server.close(); + clientSync.close(); + serverSync.close(); } assertTrue(!jmxServer.isRegistered(clientStatus)); diff --git oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/RecoverTestIT.java oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/RecoverTestIT.java index fb2e25c..09aed58 100644 --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/RecoverTestIT.java +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/RecoverTestIT.java @@ -24,8 +24,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders; -import org.apache.jackrabbit.oak.segment.standby.client.StandbySync; -import org.apache.jackrabbit.oak.segment.standby.server.StandbyServer; +import org.apache.jackrabbit.oak.segment.standby.client.StandbyClientSync; +import org.apache.jackrabbit.oak.segment.standby.server.StandbyServerSync; import org.apache.jackrabbit.oak.spi.state.NodeStore; import org.junit.After; import org.junit.Before; @@ -49,19 +49,19 @@ public class RecoverTestIT extends TestBase { NodeStore store = SegmentNodeStoreBuilders.builder(storeC).build(); addTestContent(store, "client"); - final StandbyServer server = new StandbyServer(port, storeS); - server.start(); + final StandbyServerSync serverSync = new StandbyServerSync(port, storeS); + serverSync.start(); store = SegmentNodeStoreBuilders.builder(storeS).build(); addTestContent(store, "server"); storeS.flush(); - StandbySync cl = newStandbySync(storeC); + StandbyClientSync cl = newStandbyClientSync(storeC); try { assertFalse("stores are not expected to be equal", storeS.getHead().equals(storeC.getHead())); cl.run(); assertEquals(storeS.getHead(), storeC.getHead()); } finally { - server.close(); + serverSync.close(); cl.close(); } diff --git oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/StandbyTest.java oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/StandbyTest.java index fe57c31..2dcbb57 100644 --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/StandbyTest.java +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/StandbyTest.java @@ -28,15 +28,14 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.Random; -import com.google.common.io.ByteStreams; import org.apache.jackrabbit.oak.api.Blob; import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.api.PropertyState; import org.apache.jackrabbit.oak.api.Type; import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders; import org.apache.jackrabbit.oak.segment.file.FileStore; -import org.apache.jackrabbit.oak.segment.standby.client.StandbySync; -import org.apache.jackrabbit.oak.segment.standby.server.StandbyServer; +import org.apache.jackrabbit.oak.segment.standby.client.StandbyClientSync; +import org.apache.jackrabbit.oak.segment.standby.server.StandbyServerSync; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; import org.apache.jackrabbit.oak.spi.commit.EmptyHook; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; @@ -45,6 +44,8 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import com.google.common.io.ByteStreams; + public class StandbyTest extends TestBase { @Before @@ -65,19 +66,19 @@ public class StandbyTest extends TestBase { FileStore secondary = getSecondary(); NodeStore store = SegmentNodeStoreBuilders.builder(primary).build(); - final StandbyServer server = new StandbyServer(port, primary); - server.start(); + final StandbyServerSync serverSync = new StandbyServerSync(port, primary); + serverSync.start(); byte[] data = addTestContent(store, "server", blobSize, 150); primary.flush(); - StandbySync cl = newStandbySync(secondary); - cl.run(); + StandbyClientSync clientSync = newStandbyClientSync(secondary); + clientSync.run(); try { assertEquals(primary.getHead(), secondary.getHead()); } finally { - server.close(); - cl.close(); + serverSync.close(); + clientSync.close(); } assertTrue(primary.getStats().getApproximateSize() > blobSize); diff --git oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/StandbyTestIT.java oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/StandbyTestIT.java index 8548c2e..174f112 100644 --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/StandbyTestIT.java +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/StandbyTestIT.java @@ -28,15 +28,14 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.Random; -import com.google.common.io.ByteStreams; import org.apache.jackrabbit.oak.api.Blob; import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.api.PropertyState; import org.apache.jackrabbit.oak.api.Type; import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders; import org.apache.jackrabbit.oak.segment.file.FileStore; -import org.apache.jackrabbit.oak.segment.standby.client.StandbySync; -import org.apache.jackrabbit.oak.segment.standby.server.StandbyServer; +import org.apache.jackrabbit.oak.segment.standby.client.StandbyClientSync; +import org.apache.jackrabbit.oak.segment.standby.server.StandbyServerSync; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; import org.apache.jackrabbit.oak.spi.commit.EmptyHook; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; @@ -45,6 +44,8 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import com.google.common.io.ByteStreams; + public class StandbyTestIT extends TestBase { @Before @@ -91,28 +92,28 @@ public class StandbyTestIT extends TestBase { FileStore secondary = getSecondary(); NodeStore store = SegmentNodeStoreBuilders.builder(primary).build(); - final StandbyServer server = new StandbyServer(port, primary); - server.start(); + final StandbyServerSync serverSync = new StandbyServerSync(port, primary); + serverSync.start(); byte[] data = addTestContent(store, "server", blobSize, dataNodes); primary.flush(); - StandbySync cl = newStandbySync(secondary); + StandbyClientSync clientSync = newStandbyClientSync(secondary); try { for (int i = 0; i < 5; i++) { String cp = store.checkpoint(Long.MAX_VALUE); primary.flush(); - cl.run(); + clientSync.run(); assertEquals(primary.getHead(), secondary.getHead()); assertTrue(store.release(cp)); - cl.cleanup(); + clientSync.cleanup(); assertTrue(secondary.getStats().getApproximateSize() > blobSize); } } finally { - server.close(); - cl.close(); + serverSync.close(); + clientSync.close(); } assertTrue(primary.getStats().getApproximateSize() > blobSize); diff --git oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/TestBase.java oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/TestBase.java index 2e9b728..ead8689 100644 --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/TestBase.java +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/TestBase.java @@ -30,7 +30,7 @@ import org.apache.commons.lang3.SystemUtils; import org.apache.jackrabbit.oak.commons.CIHelper; import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser; import org.apache.jackrabbit.oak.segment.file.FileStore; -import org.apache.jackrabbit.oak.segment.standby.client.StandbySync; +import org.apache.jackrabbit.oak.segment.standby.client.StandbyClientSync; import org.apache.jackrabbit.oak.stats.DefaultStatisticsProvider; import org.junit.BeforeClass; import org.junit.Rule; @@ -149,16 +149,16 @@ public class TestBase { return timeout; } - public StandbySync newStandbySync(FileStore store) throws Exception { - return newStandbySync(store, port, false); + public StandbyClientSync newStandbyClientSync(FileStore store) throws Exception { + return newStandbyClientSync(store, port, false); } - public StandbySync newStandbySync(FileStore store, int port) throws Exception { - return newStandbySync(store, port, false); + public StandbyClientSync newStandbyClientSync(FileStore store, int port) throws Exception { + return newStandbyClientSync(store, port, false); } - public StandbySync newStandbySync(FileStore store, int port, boolean secure) throws Exception { - return new StandbySync(LOCALHOST, port, store, secure, timeout, false); + public StandbyClientSync newStandbyClientSync(FileStore store, int port, boolean secure) throws Exception { + return new StandbyClientSync(LOCALHOST, port, store, secure, timeout, false); } } diff --git oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/benchmark/BenchmarkBase.java oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/benchmark/BenchmarkBase.java index 3cc8ea5..718f6e6 100644 --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/benchmark/BenchmarkBase.java +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/benchmark/BenchmarkBase.java @@ -29,7 +29,7 @@ import java.util.concurrent.ScheduledExecutorService; import org.apache.commons.io.FileUtils; import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser; import org.apache.jackrabbit.oak.segment.file.FileStore; -import org.apache.jackrabbit.oak.segment.standby.client.StandbySync; +import org.apache.jackrabbit.oak.segment.standby.client.StandbyClientSync; import org.apache.jackrabbit.oak.stats.DefaultStatisticsProvider; public class BenchmarkBase { @@ -97,16 +97,16 @@ public class BenchmarkBase { return newFileStore(directory, executor); } - public StandbySync newStandbyClient(FileStore store) throws Exception { - return newStandbyClient(store, port, false); + public StandbyClientSync newStandbyClientSync(FileStore store) throws Exception { + return newStandbyClientSync(store, port, false); } - public StandbySync newStandbyClient(FileStore store, int port) throws Exception { - return newStandbyClient(store, port, false); + public StandbyClientSync newStandbyClientSync(FileStore store, int port) throws Exception { + return newStandbyClientSync(store, port, false); } - public StandbySync newStandbyClient(FileStore store, int port, boolean secure) throws Exception { - return new StandbySync(LOCALHOST, port, store, secure, timeout, false); + public StandbyClientSync newStandbyClientSync(FileStore store, int port, boolean secure) throws Exception { + return new StandbyClientSync(LOCALHOST, port, store, secure, timeout, false); } private static File createTmpTargetDir(String name) throws IOException { diff --git oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/benchmark/BulkTransferBenchmark.java oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/benchmark/BulkTransferBenchmark.java index 9f5ae12..67e09d9 100644 --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/benchmark/BulkTransferBenchmark.java +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/benchmark/BulkTransferBenchmark.java @@ -27,9 +27,9 @@ import javax.management.MBeanServer; import javax.management.ObjectName; import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders; -import org.apache.jackrabbit.oak.segment.standby.client.StandbySync; +import org.apache.jackrabbit.oak.segment.standby.client.StandbyClientSync; import org.apache.jackrabbit.oak.segment.standby.jmx.StandbyStatusMBean; -import org.apache.jackrabbit.oak.segment.standby.server.StandbyServer; +import org.apache.jackrabbit.oak.segment.standby.server.StandbyServerSync; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; import org.apache.jackrabbit.oak.spi.commit.EmptyHook; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; @@ -91,19 +91,19 @@ public class BulkTransferBenchmark extends BenchmarkBase { store.merge(rootbuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY); storeS.flush(); - final StandbyServer server = new StandbyServer(port, storeS, useSSL); - server.start(); + final StandbyServerSync serverSync = new StandbyServerSync(port, storeS, useSSL); + serverSync.start(); - System.setProperty(StandbySync.CLIENT_ID_PROPERTY_NAME, "Bar"); - StandbySync cl = newStandbyClient(storeC, port, useSSL); + System.setProperty(StandbyClientSync.CLIENT_ID_PROPERTY_NAME, "Bar"); + StandbyClientSync clientSync = newStandbyClientSync(storeC, port, useSSL); final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer(); ObjectName status = new ObjectName(StandbyStatusMBean.JMX_NAME + ",id=*"); - ObjectName clientStatus = new ObjectName(cl.getMBeanName()); - ObjectName serverStatus = new ObjectName(server.getMBeanName()); + ObjectName clientStatus = new ObjectName(clientSync.getMBeanName()); + ObjectName serverStatus = new ObjectName(serverSync.getMBeanName()); long start = System.currentTimeMillis(); - cl.run(); + clientSync.run(); try { Set instances = jmxServer.queryNames(status, null); @@ -118,8 +118,8 @@ public class BulkTransferBenchmark extends BenchmarkBase { System.out.println("did transfer " + segments + " segments with " + bytes + " bytes in " + (System.currentTimeMillis() - start) / 1000 + " seconds."); } finally { - server.close(); - cl.close(); + serverSync.close(); + clientSync.close(); } }