From 31778e2baf26dab2252fbc14aac7f9c6cafba03d Mon Sep 17 00:00:00 2001 From: Axel Hanikel Date: Thu, 18 Oct 2018 12:09:16 +0200 Subject: [PATCH] OAK-7849 - CommitHook for recording write operations to the segment store --- .../oak/segment/SegmentNodeStore.java | 21 +++ .../segment/SegmentNodeStoreRegistrar.java | 17 ++ .../oak/segment/tool/LoggingHook.java | 174 ++++++++++++++++++ 3 files changed, 212 insertions(+) create mode 100644 oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/tool/LoggingHook.java diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStore.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStore.java index 9f0c6d6518..432a515ee0 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStore.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStore.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.Collections; import java.util.Map; +import java.util.function.Consumer; import org.apache.jackrabbit.oak.api.Blob; import org.apache.jackrabbit.oak.api.CommitFailedException; @@ -37,9 +38,11 @@ import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob; import org.apache.jackrabbit.oak.segment.scheduler.Commit; import org.apache.jackrabbit.oak.segment.scheduler.LockBasedScheduler; import org.apache.jackrabbit.oak.segment.scheduler.Scheduler; +import org.apache.jackrabbit.oak.segment.tool.LoggingHook; import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.apache.jackrabbit.oak.spi.commit.CommitHook; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.CompositeHook; import org.apache.jackrabbit.oak.spi.commit.Observable; import org.apache.jackrabbit.oak.spi.commit.Observer; import org.apache.jackrabbit.oak.spi.state.ConflictAnnotatingRebaseDiff; @@ -82,6 +85,8 @@ public class SegmentNodeStore implements NodeStore, Observable { @NotNull private StatisticsProvider statsProvider = StatisticsProvider.NOOP; + private LoggingHook loggingHook; + private SegmentNodeStoreBuilder( @NotNull Revisions revisions, @NotNull SegmentReader reader, @@ -111,6 +116,16 @@ public class SegmentNodeStore implements NodeStore, Observable { return this; } + /** + * {@link LoggingHook} for recording write operations to a log file + * @return this instance + */ + @NotNull + public SegmentNodeStoreBuilder withLoggingHook(Consumer writer) { + this.loggingHook = LoggingHook.newLoggingHook(writer); + return this; + } + @NotNull public SegmentNodeStore build() { checkState(!isCreated); @@ -157,6 +172,8 @@ public class SegmentNodeStore implements NodeStore, Observable { private final SegmentNodeStoreStats stats; + private final LoggingHook loggingHook; + private SegmentNodeStore(SegmentNodeStoreBuilder builder) { this.writer = builder.writer; this.blobStore = builder.blobStore; @@ -164,6 +181,7 @@ public class SegmentNodeStore implements NodeStore, Observable { this.scheduler = LockBasedScheduler.builder(builder.revisions, builder.reader, stats) .dispatchChanges(builder.dispatchChanges) .build(); + this.loggingHook = builder.loggingHook; } @Override @@ -187,6 +205,9 @@ public class SegmentNodeStore implements NodeStore, Observable { @NotNull CommitInfo info) throws CommitFailedException { checkArgument(builder instanceof SegmentNodeBuilder); checkArgument(((SegmentNodeBuilder) builder).isRootBuilder()); + if (loggingHook != null) { + commitHook = new CompositeHook(commitHook, loggingHook); + } return scheduler.schedule(new Commit(builder, commitHook, info)); } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreRegistrar.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreRegistrar.java index 962cd5c3a1..495812519e 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreRegistrar.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreRegistrar.java @@ -34,6 +34,10 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import com.google.common.io.Closer; +import java.io.BufferedOutputStream; +import java.io.FileOutputStream; +import java.io.OutputStream; +import java.io.UnsupportedEncodingException; import org.apache.jackrabbit.commons.SimpleValueFactory; import org.apache.jackrabbit.oak.api.Descriptors; import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean; @@ -61,6 +65,7 @@ import org.apache.jackrabbit.oak.segment.file.MetricsIOMonitor; import org.apache.jackrabbit.oak.segment.file.tar.TarPersistence; import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; import org.apache.jackrabbit.oak.segment.split.SplitPersistence; +import org.apache.jackrabbit.oak.segment.tool.LoggingHook; import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore; import org.apache.jackrabbit.oak.spi.cluster.ClusterRepositoryInfo; @@ -364,6 +369,18 @@ class SegmentNodeStoreRegistrar { SegmentNodeStore.SegmentNodeStoreBuilder segmentNodeStoreBuilder = SegmentNodeStoreBuilders.builder(store).withStatisticsProvider(cfg.getStatisticsProvider()); segmentNodeStoreBuilder.dispatchChanges(cfg.dispatchChanges()); + final String loggingHookFileName = System.getProperty(LoggingHook.class.getName() + ".filename"); + if (loggingHookFileName != null && ! "".equals(loggingHookFileName)) { + final OutputStream os = new BufferedOutputStream(new FileOutputStream(loggingHookFileName)); + segmentNodeStoreBuilder.withLoggingHook(logMessage -> { + try { + os.write(logMessage.getBytes("UTF-8")); + } catch (UnsupportedEncodingException ex) { + } catch (IOException ex) { + } + }); + } + SegmentNodeStore segmentNodeStore = segmentNodeStoreBuilder.build(); if (cfg.isPrimarySegmentStore()) { diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/tool/LoggingHook.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/tool/LoggingHook.java new file mode 100644 index 0000000000..3d3b2d5aaf --- /dev/null +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/tool/LoggingHook.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.tool; + +import java.io.IOException; +import java.io.InputStream; +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.util.function.Consumer; +import org.apache.jackrabbit.oak.api.Blob; +import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.api.PropertyState; +import static org.apache.jackrabbit.oak.api.Type.BINARIES; +import static org.apache.jackrabbit.oak.api.Type.BINARY; +import static org.apache.jackrabbit.oak.api.Type.STRING; +import static org.apache.jackrabbit.oak.api.Type.STRINGS; +import org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState; +import org.apache.jackrabbit.oak.spi.commit.CommitHook; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.state.NodeState; +import org.apache.jackrabbit.oak.spi.state.NodeStateDiff; + +public class LoggingHook implements CommitHook, NodeStateDiff { + + public static final int THREADWAITTIMEMILLIS = 1000; + + private final Consumer writer; + + private LoggingHook(final Consumer writer) { + this.writer = writer; + } + + public static LoggingHook newLoggingHook(final Consumer writer) { + return new LoggingHook(writer); + } + + public void enter(NodeState before, NodeState after) { + // do nothing + } + + public void leave(NodeState before, NodeState after) { + queueWriteLine("n!"); + } + + @Override + public boolean propertyAdded(PropertyState after) { + queueWriteLine("p+ " + toString(after)); + return true; + } + + @Override + public boolean propertyChanged(PropertyState before, PropertyState after) { + queueWriteLine("p^ " + toString(after)); + return true; + } + + @Override + public boolean propertyDeleted(PropertyState before) { + queueWriteLine("p- " + toString(before)); + return true; + } + + @Override + public boolean childNodeAdded(String name, NodeState after) { + queueWriteLine("n+ " + urlEncode(name)); + this.enter(null, after); + boolean ret = after.compareAgainstBaseState(EmptyNodeState.EMPTY_NODE, this); + this.leave(null, after); + return ret; + } + + @Override + public boolean childNodeChanged(String name, NodeState before, NodeState after) { + queueWriteLine("n^ " + urlEncode(name)); + this.enter(before, after); + boolean ret = after.compareAgainstBaseState(before, this); + this.leave(before, after); + return ret; + } + + @Override + public boolean childNodeDeleted(String name, NodeState before) { + queueWriteLine("n- " + urlEncode(name)); + return true; + } + + private static String toString(final PropertyState ps) { + final StringBuilder val = new StringBuilder(); // TODO: an output stream would certainly be better + val.append(urlEncode(ps.getName())); + val.append(" <"); + val.append(ps.getType()); + val.append("> "); + if (ps.getType() == BINARY) { + val.append("= "); + final Blob blob = ps.getValue(BINARY); + appendBlob(val, blob); + } else if (ps.getType() == BINARIES) { + val.append("= ["); + ps.getValue(BINARIES).forEach((Blob b) -> { + appendBlob(val, b); + val.append(','); + }); + replaceOrAppendLastChar(val, ',', ']'); + } else if (ps.isArray()) { + val.append("= ["); + ps.getValue(STRINGS).forEach((String s) -> { + val.append(urlEncode(s)); + val.append(','); + }); + replaceOrAppendLastChar(val, ',', ']'); + } else { + val.append("= ").append(urlEncode(ps.getValue(STRING))); + } + return val.toString(); + } + + private static String urlEncode(String s) { + String ret; + try { + ret = URLEncoder.encode(s, "UTF-8").replace("%2F", "/").replace("%3A", ":"); + } catch (UnsupportedEncodingException ex) { + ret = "ERROR: " + ex.toString(); + } + return ret; + } + + private static void replaceOrAppendLastChar(StringBuilder b, char oldChar, char newChar) { + if (b.charAt(b.length() - 1) == oldChar) { + b.setCharAt(b.length() - 1, newChar); + } else { + b.append(newChar); + } + } + + private void queueWriteLine(String s) { + writer.accept(System.currentTimeMillis() + " " + urlEncode(Thread.currentThread().getName()) + " " + s + "\n"); + } + + private static void appendBlob(StringBuilder sb, Blob blob) { + final InputStream is = blob.getNewStream(); + final char[] hex = "0123456789ABCDEF".toCharArray(); + int b; + try { + while ((b = is.read()) >= 0) { + sb.append(hex[b >> 4]); + sb.append(hex[b & 0x0f]); + } + } catch (IOException ex) { + throw new IllegalStateException(ex); + } + } + + @Override + public NodeState processCommit(NodeState before, NodeState after, CommitInfo info) throws CommitFailedException { + this.enter(before, after); + after.compareAgainstBaseState(before, this); + this.leave(before, after); + return after; + } +} -- 2.17.1