From afe51460b487318673a60e7489528aedd2b14bbd Mon Sep 17 00:00:00 2001 From: Axel Hanikel Date: Thu, 18 Oct 2018 12:09:16 +0200 Subject: [PATCH] OAK-7849 - CommitHook for recording write operations to the segment store --- .../oak/segment/SegmentNodeStore.java | 19 ++ .../oak/segment/tool/LoggingHook.java | 243 ++++++++++++++++++ 2 files changed, 262 insertions(+) create mode 100644 oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/tool/LoggingHook.java diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStore.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStore.java index 9f0c6d6518..f1edf2edc6 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStore.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStore.java @@ -27,7 +27,9 @@ import static org.apache.jackrabbit.oak.api.Type.STRING; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Map; import org.apache.jackrabbit.oak.api.Blob; @@ -37,9 +39,11 @@ import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob; import org.apache.jackrabbit.oak.segment.scheduler.Commit; import org.apache.jackrabbit.oak.segment.scheduler.LockBasedScheduler; import org.apache.jackrabbit.oak.segment.scheduler.Scheduler; +import org.apache.jackrabbit.oak.segment.tool.LoggingHook; import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.apache.jackrabbit.oak.spi.commit.CommitHook; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.CompositeHook; import org.apache.jackrabbit.oak.spi.commit.Observable; import org.apache.jackrabbit.oak.spi.commit.Observer; import org.apache.jackrabbit.oak.spi.state.ConflictAnnotatingRebaseDiff; @@ -180,6 +184,8 @@ public class SegmentNodeStore implements NodeStore, Observable { return scheduler.getHeadNodeState().getChildNode(ROOT); } + private static LoggingHook LOGGINGHOOK = null; + @NotNull @Override public NodeState merge( @@ -187,6 +193,19 @@ public class SegmentNodeStore implements NodeStore, Observable { @NotNull CommitInfo info) throws CommitFailedException { checkArgument(builder instanceof SegmentNodeBuilder); checkArgument(((SegmentNodeBuilder) builder).isRootBuilder()); + if ("true".equals(System.getProperty(LoggingHook.class.getName()))) { + if (LOGGINGHOOK == null) { + synchronized(this.getClass()) { + if (LOGGINGHOOK == null) { + LOGGINGHOOK = LoggingHook.newLoggingHook(); + } + } + } + final List hooks = new ArrayList(); + hooks.add(commitHook); + hooks.add(LOGGINGHOOK); + commitHook = CompositeHook.compose(hooks); + } return scheduler.schedule(new Commit(builder, commitHook, info)); } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/tool/LoggingHook.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/tool/LoggingHook.java new file mode 100644 index 0000000000..4c99e218e2 --- /dev/null +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/tool/LoggingHook.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.tool; + +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import org.apache.jackrabbit.oak.api.Blob; +import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.api.PropertyState; +import static org.apache.jackrabbit.oak.api.Type.BINARIES; +import static org.apache.jackrabbit.oak.api.Type.BINARY; +import static org.apache.jackrabbit.oak.api.Type.STRING; +import static org.apache.jackrabbit.oak.api.Type.STRINGS; +import org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState; +import org.apache.jackrabbit.oak.spi.commit.CommitHook; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.state.NodeState; +import org.apache.jackrabbit.oak.spi.state.NodeStateDiff; + +public class LoggingHook implements CommitHook, NodeStateDiff { + + public static final String LOGFILENAME_PROPERTY = LoggingHook.class.getName() + ".filename"; + public static final String LOGFILENAME_DEFAULT = "/tmp/logginghook.log"; + public static final int THREADWAITTIMEMILLIS = 1000; + + private final Queue queue; + private final Thread writerThread; + private OutputStream log; + private volatile boolean isOperational; + + private LoggingHook() { + isOperational = true; + queue = new ConcurrentLinkedQueue<>(); + try { + String logFileName = System.getProperty(LOGFILENAME_PROPERTY); + if (logFileName == null || "".equals(logFileName)) { + logFileName = LOGFILENAME_DEFAULT; + } + log = new FileOutputStream(logFileName); + } + catch (FileNotFoundException e) { + isOperational = false; + } + writerThread = new Thread("LoggingHookWriter") { + @Override + public void run() { + for (;;) { + try { + Thread.sleep(THREADWAITTIMEMILLIS); + } + catch (InterruptedException ex) { + return; + } + String entry; + while ((entry = queue.poll()) != null) { + try { + log.write(entry.getBytes("UTF-8")); + } + catch (UnsupportedEncodingException ex) { + isOperational = false; + } + catch (IOException ex) { + isOperational = false; + } + }; + } + } + }; + } + + public static LoggingHook newLoggingHook() { + final LoggingHook h = new LoggingHook(); + h.writerThread.start(); + return h; + } + + public void enter(NodeState before, NodeState after) { + // do nothing + } + + public void leave(NodeState before, NodeState after) { + if (isOperational) { + queueWriteLine("n!"); + } + } + + @Override + public boolean propertyAdded(PropertyState after) { + if (isOperational) { + queueWriteLine("p+ " + toString(after)); + } + return isOperational; + } + + @Override + public boolean propertyChanged(PropertyState before, PropertyState after) { + if (isOperational) { + queueWriteLine("p^ " + toString(after)); + } + return isOperational; + } + + @Override + public boolean propertyDeleted(PropertyState before) { + if (isOperational) { + queueWriteLine("p- " + toString(before)); + } + return isOperational; + } + + @Override + public boolean childNodeAdded(String name, NodeState after) { + if (isOperational) { + queueWriteLine("n+ " + urlEncode(name)); + this.enter(null, after); + boolean ret = after.compareAgainstBaseState(EmptyNodeState.EMPTY_NODE, this); + this.leave(null, after); + return ret; + } + return isOperational; + } + + @Override + public boolean childNodeChanged(String name, NodeState before, NodeState after) { + if (isOperational) { + queueWriteLine("n^ " + urlEncode(name)); + this.enter(before, after); + boolean ret = after.compareAgainstBaseState(before, this); + this.leave(before, after); + return ret; + } + return false; + } + + @Override + public boolean childNodeDeleted(String name, NodeState before) { + if (isOperational) { + queueWriteLine("n- " + urlEncode(name)); + } + return isOperational; + } + + private static String toString(final PropertyState ps) { + final StringBuilder val = new StringBuilder(); // TODO: an output stream would certainly be better + val.append(urlEncode(ps.getName())); + val.append(" <"); + val.append(ps.getType()); + val.append("> "); + if (ps.getType() == BINARY) { + val.append("= "); + final Blob blob = ps.getValue(BINARY); + appendBlob(val, blob); + } + else if (ps.getType() == BINARIES) { + val.append("= ["); + ps.getValue(BINARIES).forEach((Blob b) -> { + appendBlob(val, b); + val.append(','); + }); + replaceOrAppendLastChar(val, ',', ']'); + } + else if (ps.isArray()) { + val.append("= ["); + ps.getValue(STRINGS).forEach((String s) -> { + val.append(urlEncode(s)); + val.append(','); + }); + replaceOrAppendLastChar(val, ',', ']'); + } + else { + val.append("= ").append(urlEncode(ps.getValue(STRING))); + } + return val.toString(); + } + + private static String urlEncode(String s) { + String ret; + try { + ret = URLEncoder.encode(s, "UTF-8").replace("%2F", "/").replace("%3A", ":"); + } + catch (UnsupportedEncodingException ex) { + ret = "ERROR: " + ex.toString(); + } + return ret; + } + + private static void replaceOrAppendLastChar(StringBuilder b, char oldChar, char newChar) { + if (b.charAt(b.length() - 1) == oldChar) { + b.setCharAt(b.length() - 1, newChar); + } + else { + b.append(newChar); + } + } + + private void queueWriteLine(String s) { + queue.add(System.currentTimeMillis() + " " + urlEncode(Thread.currentThread().getName()) + " " + s + "\n"); + } + + private static void appendBlob(StringBuilder sb, Blob blob) { + final InputStream is = blob.getNewStream(); + final char[] hex = "0123456789ABCDEF".toCharArray(); + int b; + try { + while ((b = is.read()) >= 0) { + sb.append(hex[b >> 4]); + sb.append(hex[b & 0x0f]); + } + } + catch (IOException ex) { + throw new IllegalStateException(ex); + } + } + + @Override + public NodeState processCommit(NodeState before, NodeState after, CommitInfo info) throws CommitFailedException { + this.enter(before, after); + after.compareAgainstBaseState(before, this); + this.leave(before, after); + return after; + } +} -- 2.17.1