Index: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyBlobReader.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyBlobReader.java (revision 0) +++ src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyBlobReader.java (working copy) @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.oak.segment.standby.server; + +import org.apache.jackrabbit.oak.api.Blob; +import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob; +import org.apache.jackrabbit.oak.segment.file.FileStore; +import org.apache.jackrabbit.oak.spi.blob.BlobStore; + +class DefaultStandbyBlobReader implements StandbyBlobReader { + + private final FileStore store; + + DefaultStandbyBlobReader(FileStore store) { + this.store = store; + } + + @Override + public Blob readBlob(String blobId) { + BlobStore blobStore = store.getBlobStore(); + + if (blobStore != null) { + return new BlobStoreBlob(blobStore, blobId); + } + + return null; + } + +} Property changes on: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyBlobReader.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReader.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReader.java (revision 0) +++ src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReader.java (working copy) @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.oak.segment.standby.server; + +import org.apache.jackrabbit.oak.segment.RecordId; +import org.apache.jackrabbit.oak.segment.file.FileStore; + +class DefaultStandbyHeadReader implements StandbyHeadReader { + + private final FileStore store; + + DefaultStandbyHeadReader(FileStore store) { + this.store = store; + } + + @Override + public RecordId readHeadRecordId() { + return store.getHead().getRecordId(); + } + +} Property changes on: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReader.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbySegmentReader.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbySegmentReader.java (revision 0) +++ src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbySegmentReader.java (working copy) @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.oak.segment.standby.server; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.apache.jackrabbit.oak.segment.Segment; +import org.apache.jackrabbit.oak.segment.SegmentId; +import org.apache.jackrabbit.oak.segment.SegmentNotFoundException; +import org.apache.jackrabbit.oak.segment.file.FileStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class DefaultStandbySegmentReader implements StandbySegmentReader { + + private static final Logger log = LoggerFactory.getLogger(DefaultStandbySegmentReader.class); + + private final FileStore store; + + DefaultStandbySegmentReader(FileStore store) { + this.store = store; + } + + @Override + public Segment readSegment(UUID uuid) { + long msb = uuid.getMostSignificantBits(); + long lsb = uuid.getLeastSignificantBits(); + + SegmentId id = store.newSegmentId(msb, lsb); + + for (int i = 0; i < 10; i++) { + try { + return store.readSegment(id); + } catch (SegmentNotFoundException e) { + log.warn("Unable to read segment, waiting...", e); + } + + try { + TimeUnit.MILLISECONDS.sleep(2000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } + } + + return null; + } + +} Property changes on: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbySegmentReader.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandler.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandler.java (revision 0) +++ src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandler.java (working copy) @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.oak.segment.standby.server; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.apache.jackrabbit.oak.api.Blob; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class GetBlobRequestHandler extends SimpleChannelInboundHandler { + + private static final Logger log = LoggerFactory.getLogger(GetBlobRequestHandler.class); + + private final StandbyBlobReader reader; + + GetBlobRequestHandler(StandbyBlobReader reader) { + this.reader = reader; + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, GetBlobRequest msg) throws Exception { + log.debug("Reading blob {} for client {}", msg.getBlobId(), msg.getClientId()); + + Blob blob = reader.readBlob(msg.getBlobId()); + + if (blob == null) { + log.debug("Blob {} not found, discarding request from client {}", msg.getBlobId(), msg.getClientId()); + return; + } + + ctx.writeAndFlush(new GetBlobResponse(msg.getClientId(), blob)); + } + +} Property changes on: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandler.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponse.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponse.java (revision 0) +++ src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponse.java (working copy) @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.oak.segment.standby.server; + +import org.apache.jackrabbit.oak.api.Blob; + +class GetBlobResponse { + + private final String clientId; + + private final Blob blob; + + GetBlobResponse(String clientId, Blob blob) { + this.clientId = clientId; + this.blob = blob; + } + + String getClientId() { + return clientId; + } + + Blob getBlob() { + return blob; + } + +} Property changes on: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponse.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponseEncoder.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponseEncoder.java (revision 0) +++ src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponseEncoder.java (working copy) @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.oak.segment.standby.server; + +import java.io.InputStream; +import java.nio.charset.Charset; + +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; +import org.apache.commons.io.IOUtils; +import org.apache.jackrabbit.oak.api.Blob; +import org.apache.jackrabbit.oak.segment.standby.codec.Messages; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class GetBlobResponseEncoder extends MessageToByteEncoder { + + private static final Logger log = LoggerFactory.getLogger(GetBlobResponseEncoder.class); + + @Override + protected void encode(ChannelHandlerContext ctx, GetBlobResponse msg, ByteBuf out) throws Exception { + log.debug("Sending blob {} to client {}", msg.getBlob().getContentIdentity(), msg.getClientId()); + encode(msg.getBlob(), out); + } + + private void encode(Blob b, ByteBuf out) throws Exception { + byte[] bytes; + + try (InputStream s = b.getNewStream()) { + bytes = IOUtils.toByteArray(s); + } + + Hasher hasher = Hashing.murmur3_32().newHasher(); + long hash = hasher.putBytes(bytes).hash().padToLong(); + + out.writeInt(bytes.length); + out.writeByte(Messages.HEADER_BLOB); + + String bid = b.getContentIdentity(); + byte[] id = bid.getBytes(Charset.forName("UTF-8")); + out.writeInt(id.length); + out.writeBytes(id); + + out.writeLong(hash); + out.writeBytes(bytes); + } + +} Property changes on: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponseEncoder.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandler.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandler.java (revision 0) +++ src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandler.java (working copy) @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.oak.segment.standby.server; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.apache.jackrabbit.oak.segment.RecordId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Handles 'get head' requests and produces 'get head' responses. A response is + * generated only iff the record ID of the head root state can be read. + */ +class GetHeadRequestHandler extends SimpleChannelInboundHandler { + + private static final Logger log = LoggerFactory.getLogger(GetHeadRequest.class); + + private final StandbyHeadReader reader; + + GetHeadRequestHandler(StandbyHeadReader reader) { + this.reader = reader; + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, GetHeadRequest msg) throws Exception { + log.debug("Reading head for client {}", msg.getClientId()); + + RecordId id = reader.readHeadRecordId(); + + if (id == null) { + log.debug("Head not found, discarding request from client {}", msg.getClientId()); + return; + } + + ctx.writeAndFlush(new GetHeadResponse(msg.getClientId(), id)); + } + +} Property changes on: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandler.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponse.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponse.java (revision 0) +++ src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponse.java (working copy) @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.oak.segment.standby.server; + +import org.apache.jackrabbit.oak.segment.RecordId; + +class GetHeadResponse { + + private final String clientId; + + private final RecordId headRecordId; + + GetHeadResponse(String clientId, RecordId headRecordId) { + this.clientId = clientId; + this.headRecordId = headRecordId; + } + + String getClientId() { + return clientId; + } + + RecordId getHeadRecordId() { + return headRecordId; + } + +} Property changes on: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponse.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponseEncoder.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponseEncoder.java (revision 0) +++ src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponseEncoder.java (working copy) @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.oak.segment.standby.server; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; +import io.netty.util.CharsetUtil; +import org.apache.jackrabbit.oak.segment.standby.codec.Messages; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Encodes a 'get head' response. + */ +class GetHeadResponseEncoder extends MessageToByteEncoder { + + private static final Logger log = LoggerFactory.getLogger(GetHeadResponseEncoder.class); + + @Override + protected void encode(ChannelHandlerContext ctx, GetHeadResponse msg, ByteBuf out) throws Exception { + log.debug("Sending head {} to client {}", msg.getHeadRecordId(), msg.getClientId()); + byte[] body = msg.getHeadRecordId().toString().getBytes(CharsetUtil.UTF_8); + out.writeInt(body.length + 1); + out.writeByte(Messages.HEADER_RECORD); + out.writeBytes(body); + } + +} Property changes on: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponseEncoder.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequest.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequest.java (revision 1760352) +++ src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequest.java (working copy) @@ -17,13 +17,15 @@ package org.apache.jackrabbit.oak.segment.standby.server; +import java.util.UUID; + class GetSegmentRequest { private final String clientId; - private final String segmentId; + private final UUID segmentId; - GetSegmentRequest(String clientId, String segmentId) { + GetSegmentRequest(String clientId, UUID segmentId) { this.clientId = clientId; this.segmentId = segmentId; } @@ -32,7 +34,7 @@ return clientId; } - public String getSegmentId() { + public UUID getSegmentId() { return segmentId; } Index: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequestHandler.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequestHandler.java (revision 0) +++ src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequestHandler.java (working copy) @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.oak.segment.standby.server; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.apache.jackrabbit.oak.segment.Segment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class GetSegmentRequestHandler extends SimpleChannelInboundHandler { + + private static final Logger log = LoggerFactory.getLogger(GetSegmentRequestHandler.class); + + private final StandbySegmentReader reader; + + GetSegmentRequestHandler(StandbySegmentReader reader) { + this.reader = reader; + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, GetSegmentRequest msg) throws Exception { + log.debug("Reading segment {} for client {}", msg.getSegmentId(), msg.getClientId()); + + Segment segment = reader.readSegment(msg.getSegmentId()); + + if (segment == null) { + log.debug("Segment {} not found, discarding request from client {}", msg.getSegmentId(), msg.getClientId()); + return; + } + + ctx.writeAndFlush(new GetSegmentResponse(msg.getClientId(), segment)); + } + +} Property changes on: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequestHandler.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponse.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponse.java (revision 0) +++ src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponse.java (working copy) @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.oak.segment.standby.server; + +import org.apache.jackrabbit.oak.segment.Segment; + +class GetSegmentResponse { + + private final String clientId; + + private final Segment segment; + + GetSegmentResponse(String clientId, Segment segment) { + this.clientId = clientId; + this.segment = segment; + } + + String getClientId() { + return clientId; + } + + Segment getSegment() { + return segment; + } + +} Property changes on: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponse.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponseEncoder.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponseEncoder.java (revision 0) +++ src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponseEncoder.java (working copy) @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.oak.segment.standby.server; + +import java.io.ByteArrayOutputStream; + +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; +import org.apache.jackrabbit.oak.segment.Segment; +import org.apache.jackrabbit.oak.segment.SegmentId; +import org.apache.jackrabbit.oak.segment.standby.codec.Messages; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Encodes a 'get segment' response. + */ +class GetSegmentResponseEncoder extends MessageToByteEncoder { + + private static final Logger log = LoggerFactory.getLogger(GetSegmentResponseEncoder.class); + + private static final int EXTRA_HEADERS_LEN = 29; + + private static final int EXTRA_HEADERS_WO_SIZE = EXTRA_HEADERS_LEN - 4; + + @Override + protected void encode(ChannelHandlerContext ctx, GetSegmentResponse msg, ByteBuf out) throws Exception { + log.debug("Sending segment {} to client {}", msg.getSegment().getSegmentId(), msg.getClientId()); + encode(msg.getSegment(), out); + } + + private void encode(Segment s, ByteBuf out) throws Exception { + SegmentId id = s.getSegmentId(); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(s.size()); + s.writeTo(baos); + byte[] segment = baos.toByteArray(); + + Hasher hasher = Hashing.murmur3_32().newHasher(); + long hash = hasher.putBytes(segment).hash().padToLong(); + + int len = segment.length + EXTRA_HEADERS_WO_SIZE; + out.writeInt(len); + out.writeByte(Messages.HEADER_SEGMENT); + out.writeLong(id.getMostSignificantBits()); + out.writeLong(id.getLeastSignificantBits()); + out.writeLong(hash); + out.writeBytes(segment); + } + +} Property changes on: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponseEncoder.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/RequestDecoder.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/segment/standby/server/RequestDecoder.java (revision 1760352) +++ src/main/java/org/apache/jackrabbit/oak/segment/standby/server/RequestDecoder.java (working copy) @@ -18,6 +18,7 @@ package org.apache.jackrabbit.oak.segment.standby.server; import java.util.List; +import java.util.UUID; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; @@ -45,7 +46,7 @@ out.add(new GetHeadRequest(Messages.extractClientFrom(msg))); } else if (request.startsWith(Messages.GET_SEGMENT)) { log.debug("Parsed 'get segment' message"); - out.add(new GetSegmentRequest(Messages.extractClientFrom(msg), request.substring(Messages.GET_SEGMENT.length()))); + out.add(new GetSegmentRequest(Messages.extractClientFrom(msg), UUID.fromString(request.substring(Messages.GET_SEGMENT.length())))); } else if (request.startsWith(Messages.GET_BLOB)) { log.debug("Parsed 'get blob' message"); out.add(new GetBlobRequest(Messages.extractClientFrom(msg), request.substring(Messages.GET_BLOB.length()))); Index: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/RequestObserverHandler.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/segment/standby/server/RequestObserverHandler.java (revision 0) +++ src/main/java/org/apache/jackrabbit/oak/segment/standby/server/RequestObserverHandler.java (working copy) @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.oak.segment.standby.server; + +import java.net.InetSocketAddress; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import org.apache.jackrabbit.oak.segment.standby.store.CommunicationObserver; + +/** + * Notifies an observer when a valid request has been received and parsed by + * this server. + */ +class RequestObserverHandler extends ChannelInboundHandlerAdapter { + + private final CommunicationObserver observer; + + RequestObserverHandler(CommunicationObserver observer) { + this.observer = observer; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress(); + + if (msg instanceof GetHeadRequest) { + onGetHeadRequest((GetHeadRequest) msg, address); + } else if (msg instanceof GetSegmentRequest) { + onGetSegmentRequest((GetSegmentRequest) msg, address); + } else if (msg instanceof GetBlobRequest) { + onGetBlobRequest((GetBlobRequest) msg, address); + } + + ctx.fireChannelRead(msg); + } + + private void onGetHeadRequest(GetHeadRequest request, InetSocketAddress address) throws Exception { + observer.gotMessageFrom(request.getClientId(), "get head", address); + } + + private void onGetSegmentRequest(GetSegmentRequest request, InetSocketAddress address) throws Exception { + observer.gotMessageFrom(request.getClientId(), "get segment", address); + } + + private void onGetBlobRequest(GetBlobRequest request, InetSocketAddress address) throws Exception { + observer.gotMessageFrom(request.getClientId(), "get blob id", address); + } + +} Property changes on: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/RequestObserverHandler.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/ResponseObserverHandler.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/segment/standby/server/ResponseObserverHandler.java (revision 0) +++ src/main/java/org/apache/jackrabbit/oak/segment/standby/server/ResponseObserverHandler.java (working copy) @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.oak.segment.standby.server; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; +import org.apache.jackrabbit.oak.segment.standby.store.CommunicationObserver; + +/** + * Notifies an observer when a 'get segment' or 'get blob' response is sent + * from this server. + */ +class ResponseObserverHandler extends ChannelOutboundHandlerAdapter { + + private final CommunicationObserver observer; + + ResponseObserverHandler(CommunicationObserver observer) { + this.observer = observer; + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + if (msg instanceof GetSegmentResponse) { + onGetSegmentResponse((GetSegmentResponse) msg); + } else if (msg instanceof GetBlobResponse) { + onGetBlobResponse((GetBlobResponse) msg); + } + + ctx.write(msg, promise); + } + + private void onGetSegmentResponse(GetSegmentResponse response) { + observer.didSendSegmentBytes(response.getClientId(), response.getSegment().size()); + } + + private void onGetBlobResponse(GetBlobResponse response) { + observer.didSendBinariesBytes(response.getClientId(), (int) Math.max(0, response.getBlob().length())); + } + +} Property changes on: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/ResponseObserverHandler.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyBlobReader.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyBlobReader.java (revision 0) +++ src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyBlobReader.java (working copy) @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.oak.segment.standby.server; + +import org.apache.jackrabbit.oak.api.Blob; + +interface StandbyBlobReader { + + Blob readBlob(String blobId); + +} Property changes on: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyBlobReader.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyHeadReader.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyHeadReader.java (revision 0) +++ src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyHeadReader.java (working copy) @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.oak.segment.standby.server; + +import org.apache.jackrabbit.oak.segment.RecordId; + +/** + * Read the head record ID. + */ +interface StandbyHeadReader { + + /** + * Read the head record ID. + * + * @return the head record ID or {@code null} if the head record ID can't be + * found. + */ + RecordId readHeadRecordId(); + +} Property changes on: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyHeadReader.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbySegmentReader.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbySegmentReader.java (revision 0) +++ src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbySegmentReader.java (working copy) @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.oak.segment.standby.server; + +import java.util.UUID; + +import org.apache.jackrabbit.oak.segment.Segment; + +interface StandbySegmentReader { + + Segment readSegment(UUID segmentId); + +} Property changes on: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbySegmentReader.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java (revision 1760352) +++ src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java (working copy) @@ -46,9 +46,6 @@ import io.netty.handler.ssl.util.SelfSignedCertificate; import io.netty.util.CharsetUtil; import org.apache.jackrabbit.oak.segment.file.FileStore; -import org.apache.jackrabbit.oak.segment.standby.codec.BlobEncoder; -import org.apache.jackrabbit.oak.segment.standby.codec.RecordIdEncoder; -import org.apache.jackrabbit.oak.segment.standby.codec.SegmentEncoder; import org.apache.jackrabbit.oak.segment.standby.jmx.StandbyStatusMBean; import org.apache.jackrabbit.oak.segment.standby.store.CommunicationObserver; import org.slf4j.Logger; @@ -65,11 +62,12 @@ private final EventLoopGroup workerGroup; private final ServerBootstrap b; private final CommunicationObserver observer; - private final StandbyServerHandler handler; private SslContext sslContext; private ChannelFuture channelFuture; private boolean running; + private volatile String state; + public StandbyServer(int port, final FileStore store) throws CertificateException, SSLException { this(port, store, null, false); } @@ -91,7 +89,6 @@ } observer = new CommunicationObserver("primary"); - handler = new StandbyServerHandler(store, observer); bossGroup = new NioEventLoopGroup(1); workerGroup = new NioEventLoopGroup(); @@ -124,24 +121,49 @@ p.addLast(sslContext.newHandler(ch.alloc())); } + // Decoders + p.addLast(new LineBasedFrameDecoder(8192)); p.addLast(new StringDecoder(CharsetUtil.UTF_8)); + p.addLast(new RequestDecoder()); + p.addLast(new StateHandler(newStateConsumer())); + p.addLast(new RequestObserverHandler(observer)); + + // Encoders + p.addLast(new SnappyFramedEncoder()); - p.addLast(new RecordIdEncoder()); - p.addLast(new SegmentEncoder()); - p.addLast(new BlobEncoder()); - p.addLast(handler); + p.addLast(new GetHeadResponseEncoder()); + p.addLast(new GetSegmentResponseEncoder()); + p.addLast(new GetBlobResponseEncoder()); + p.addLast(new ResponseObserverHandler(observer)); + + // Handlers + + p.addLast(new GetHeadRequestHandler(new DefaultStandbyHeadReader(store))); + p.addLast(new GetSegmentRequestHandler(new DefaultStandbySegmentReader(store))); + p.addLast(new GetBlobRequestHandler(new DefaultStandbyBlobReader(store))); } }); } + private StateConsumer newStateConsumer() { + return new StateConsumer() { + + @Override + public void consumeState(String state) { + StandbyServer.this.state = state; + } + + }; + } + public String getMBeanName() { return StandbyStatusMBean.JMX_NAME + ",id=" + this.port; } public void close() { stop(); - handler.state = STATUS_CLOSING; + state = STATUS_CLOSING; observer.unregister(); final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer(); try { @@ -157,7 +179,7 @@ if (workerGroup != null && !workerGroup.isShuttingDown()) { workerGroup.shutdownGracefully(0, 1, TimeUnit.SECONDS).syncUninterruptibly(); } - handler.state = STATUS_CLOSED; + state = STATUS_CLOSED; } @Override @@ -166,7 +188,7 @@ return; } - handler.state = STATUS_STARTING; + state = STATUS_STARTING; channelFuture = b.bind(port); @@ -189,20 +211,20 @@ private void onSuccessfulStart() { log.debug("Binding was successful"); - handler.state = STATUS_RUNNING; + state = STATUS_RUNNING; running = true; } private void onUnsuccessfulStart() { log.debug("Binding was unsuccessful", channelFuture.cause()); - handler.state = null; + state = null; running = false; throw new RuntimeException(channelFuture.cause()); } private void onStartTimeOut() { log.debug("Binding timed out, canceling"); - handler.state = null; + state = null; running = false; channelFuture.cancel(true); } @@ -219,13 +241,14 @@ public void stop() { if (running) { running = false; - this.handler.state = STATUS_STOPPED; + this.state = STATUS_STOPPED; channelFuture.channel().disconnect(); } } @Override public String getStatus() { - return handler == null ? STATUS_INITIALIZING : handler.state; + return state == null ? STATUS_INITIALIZING : state; } + } Index: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServerHandler.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServerHandler.java (revision 1760352) +++ src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServerHandler.java (working copy) @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.jackrabbit.oak.segment.standby.server; - -import java.net.InetSocketAddress; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - -import io.netty.buffer.Unpooled; -import io.netty.channel.ChannelHandler.Sharable; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; -import org.apache.jackrabbit.oak.api.Blob; -import org.apache.jackrabbit.oak.api.IllegalRepositoryStateException; -import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob; -import org.apache.jackrabbit.oak.segment.RecordId; -import org.apache.jackrabbit.oak.segment.Segment; -import org.apache.jackrabbit.oak.segment.file.FileStore; -import org.apache.jackrabbit.oak.segment.standby.codec.Messages; -import org.apache.jackrabbit.oak.segment.standby.store.CommunicationObserver; -import org.apache.jackrabbit.oak.spi.blob.BlobStore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Sharable -public class StandbyServerHandler extends SimpleChannelInboundHandler { - - private static final Logger log = LoggerFactory.getLogger(StandbyServerHandler.class); - - private final FileStore store; - - private final CommunicationObserver observer; - - public String state; - - public StandbyServerHandler(FileStore store, CommunicationObserver observer) { - this.store = store; - this.observer = observer; - } - - private RecordId headId() { - if (store != null) { - return store.getHead().getRecordId(); - } - return null; - } - - @Override - public void channelRegistered(io.netty.channel.ChannelHandlerContext ctx) throws java.lang.Exception { - state = "channel registered"; - super.channelRegistered(ctx); - } - - @Override - public void channelActive(io.netty.channel.ChannelHandlerContext ctx) throws java.lang.Exception { - state = "channel active"; - super.channelActive(ctx); - } - - @Override - public void channelInactive(io.netty.channel.ChannelHandlerContext ctx) throws java.lang.Exception { - state = "channel inactive"; - super.channelInactive(ctx); - } - - @Override - public void channelUnregistered(io.netty.channel.ChannelHandlerContext ctx) throws java.lang.Exception { - state = "channel unregistered"; - super.channelUnregistered(ctx); - } - - @Override - public void channelRead0(ChannelHandlerContext ctx, String payload) throws Exception { - state = "got message"; - - String request = Messages.extractMessageFrom(payload); - InetSocketAddress client = (InetSocketAddress) ctx.channel().remoteAddress(); - - String clientID = Messages.extractClientFrom(payload); - observer.gotMessageFrom(clientID, request, client); - if (Messages.GET_HEAD.equalsIgnoreCase(request)) { - RecordId r = headId(); - if (r != null) { - ctx.writeAndFlush(r); - return; - } - } else if (request.startsWith(Messages.GET_SEGMENT)) { - String sid = request.substring(Messages.GET_SEGMENT.length()); - log.debug("request segment id {}", sid); - UUID uuid = UUID.fromString(sid); - - Segment s = null; - - for (int i = 0; i < 10; i++) { - try { - s = store.readSegment(store.newSegmentId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits())); - } catch (IllegalRepositoryStateException e) { - // segment not found - log.debug("waiting for segment. Got exception: " + e.getMessage()); - TimeUnit.MILLISECONDS.sleep(2000); - } - if (s != null) { - break; - } - } - - if (s != null) { - log.debug("sending segment " + sid + " to " + client); - ctx.writeAndFlush(s); - observer.didSendSegmentBytes(clientID, s.size()); - return; - } - } else if (request.startsWith(Messages.GET_BLOB)) { - String bid = request.substring(Messages.GET_BLOB.length()); - log.debug("request blob id {}", bid); - Blob b = readBlob(bid); - log.debug("sending blob " + bid + " to " + client); - ctx.writeAndFlush(b); - observer.didSendBinariesBytes(clientID, - Math.max(0, (int) b.length())); - return; - } else { - log.warn("Unknown request {}, ignoring.", request); - } - ctx.writeAndFlush(Unpooled.EMPTY_BUFFER); - } - - @Override - public void channelReadComplete(ChannelHandlerContext ctx) { - ctx.flush(); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - state = "exception occurred: " + cause.getMessage(); - boolean isReadTimeout = cause.getMessage() != null - && cause.getMessage().contains("Connection reset by peer"); - if (isReadTimeout) { - log.warn("Exception occurred: " + cause.getMessage(), cause); - } else { - log.error("Exception occurred: " + cause.getMessage(), cause); - } - } - - private Blob readBlob(String blobId) { - BlobStore blobStore = store.getBlobStore(); - - if (blobStore != null) { - return new BlobStoreBlob(blobStore, blobId); - } - - throw new IllegalStateException("Attempt to read external blob with blobId [" + blobId + "] without specifying BlobStore"); - } - -} Index: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StateConsumer.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StateConsumer.java (revision 0) +++ src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StateConsumer.java (working copy) @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.oak.segment.standby.server; + +/** + * Implementors of this interface can consume the state of the communication + * pipeline as tracked by {@link StateHandler}. + */ +interface StateConsumer { + + void consumeState(String state); + +} Property changes on: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StateConsumer.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StateHandler.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StateHandler.java (revision 0) +++ src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StateHandler.java (working copy) @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.oak.segment.standby.server; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +/** + * Tracks the state of the communication pipeline and communicates it to an + * external consumer. + */ +class StateHandler extends ChannelInboundHandlerAdapter { + + private final StateConsumer consumer; + + StateHandler(StateConsumer consumer) { + this.consumer = consumer; + } + + @Override + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + consumer.consumeState("channel registered"); + super.channelRegistered(ctx); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + consumer.consumeState("channel active"); + super.channelActive(ctx); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + consumer.consumeState("channel inactive"); + super.channelInactive(ctx); + } + + @Override + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + consumer.consumeState("channel unregistered"); + super.channelUnregistered(ctx); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + consumer.consumeState("got message"); + super.channelRead(ctx, msg); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + consumer.consumeState("exception occurred: " + cause.getMessage()); + super.exceptionCaught(ctx, cause); + } + +} Property changes on: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StateHandler.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandlerTest.java =================================================================== --- src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandlerTest.java (revision 0) +++ src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandlerTest.java (working copy) @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.oak.segment.standby.server; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import io.netty.channel.embedded.EmbeddedChannel; +import org.apache.jackrabbit.oak.api.Blob; +import org.junit.Test; + +public class GetBlobRequestHandlerTest { + + @Test + public void successfulReadsShouldGenerateResponses() throws Exception { + Blob blob = mock(Blob.class); + + StandbyBlobReader reader = mock(StandbyBlobReader.class); + when(reader.readBlob("blobId")).thenReturn(blob); + + EmbeddedChannel channel = new EmbeddedChannel(new GetBlobRequestHandler(reader)); + channel.writeInbound(new GetBlobRequest("clientId", "blobId")); + GetBlobResponse response = (GetBlobResponse) channel.readOutbound(); + assertEquals("clientId", response.getClientId()); + assertSame(blob, response.getBlob()); + } + + @Test + public void unsuccessfulReadsShouldBeDiscarded() throws Exception { + StandbyBlobReader reader = mock(StandbyBlobReader.class); + when(reader.readBlob("blobId")).thenReturn(null); + + EmbeddedChannel channel = new EmbeddedChannel(new GetBlobRequestHandler(reader)); + channel.writeInbound(new GetBlobRequest("clientId", "blobId")); + assertNull(channel.readOutbound()); + } + + @Test + public void unrecognizedMessagesShouldBeIgnored() throws Exception { + StandbyBlobReader reader = mock(StandbyBlobReader.class); + EmbeddedChannel channel = new EmbeddedChannel(new GetBlobRequestHandler(reader)); + channel.writeInbound("unrecognized"); + assertEquals("unrecognized", channel.readInbound()); + } + +} Property changes on: src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandlerTest.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponseEncoderTest.java =================================================================== --- src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponseEncoderTest.java (revision 0) +++ src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponseEncoderTest.java (working copy) @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.oak.segment.standby.server; + +import static org.apache.jackrabbit.oak.segment.standby.server.ServerTestUtils.hash; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayInputStream; + +import com.google.common.base.Charsets; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.embedded.EmbeddedChannel; +import org.apache.jackrabbit.oak.api.Blob; +import org.apache.jackrabbit.oak.segment.standby.codec.Messages; +import org.junit.Test; + +public class GetBlobResponseEncoderTest { + + @Test + public void encodeResponse() throws Exception { + byte[] data = new byte[] {1, 2, 3}; + + String contentIdentity = "contentIdentity"; + byte[] contentIdentityBytes = contentIdentity.getBytes(Charsets.UTF_8); + + Blob blob = mock(Blob.class); + when(blob.getNewStream()).thenReturn(new ByteArrayInputStream(data)); + when(blob.getContentIdentity()).thenReturn(contentIdentity); + + EmbeddedChannel channel = new EmbeddedChannel(new GetBlobResponseEncoder()); + channel.writeOutbound(new GetBlobResponse("clientId", blob)); + ByteBuf buffer = (ByteBuf) channel.readOutbound(); + + ByteBuf expected = Unpooled.buffer(); + expected.writeInt(3); + expected.writeByte(Messages.HEADER_BLOB); + expected.writeInt(contentIdentityBytes.length); + expected.writeBytes(contentIdentityBytes); + expected.writeLong(hash(data)); + expected.writeBytes(data); + + assertEquals(expected, buffer); + } + +} Property changes on: src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponseEncoderTest.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandlerTest.java =================================================================== --- src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandlerTest.java (revision 0) +++ src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandlerTest.java (working copy) @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.oak.segment.standby.server; + +import static org.apache.jackrabbit.oak.segment.standby.server.ServerTestUtils.mockRecordId; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import io.netty.channel.embedded.EmbeddedChannel; +import org.apache.jackrabbit.oak.segment.RecordId; +import org.junit.Test; + +public class GetHeadRequestHandlerTest { + + @Test + public void successfulReadsShouldGenerateResponses() throws Exception { + RecordId headRecordId = mockRecordId(1, 2, 8); + + StandbyHeadReader reader = mock(StandbyHeadReader.class); + when(reader.readHeadRecordId()).thenReturn(headRecordId); + + EmbeddedChannel channel = new EmbeddedChannel(new GetHeadRequestHandler(reader)); + channel.writeInbound(new GetHeadRequest("clientId")); + GetHeadResponse response = (GetHeadResponse) channel.readOutbound(); + assertSame(headRecordId, response.getHeadRecordId()); + assertEquals("clientId", response.getClientId()); + } + + @Test + public void unsuccessfulReadsShouldBeDiscarded() throws Exception { + StandbyHeadReader reader = mock(StandbyHeadReader.class); + when(reader.readHeadRecordId()).thenReturn(null); + + EmbeddedChannel channel = new EmbeddedChannel(new GetHeadRequestHandler(reader)); + channel.writeInbound(new GetHeadRequest("clientId")); + assertNull(channel.readOutbound()); + } + + @Test + public void unrecognizedMessagesShouldBeIgnored() throws Exception { + EmbeddedChannel channel = new EmbeddedChannel(new GetHeadRequestHandler(mock(StandbyHeadReader.class))); + channel.writeInbound("unrecognized"); + assertEquals("unrecognized", channel.readInbound()); + } + +} Property changes on: src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandlerTest.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponseEncoderTest.java =================================================================== --- src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponseEncoderTest.java (revision 0) +++ src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponseEncoderTest.java (working copy) @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.oak.segment.standby.server; + +import static org.apache.jackrabbit.oak.segment.standby.server.ServerTestUtils.mockRecordId; +import static org.junit.Assert.assertEquals; + +import com.google.common.base.Charsets; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.embedded.EmbeddedChannel; +import org.apache.jackrabbit.oak.segment.RecordId; +import org.apache.jackrabbit.oak.segment.standby.codec.Messages; +import org.junit.Test; + +public class GetHeadResponseEncoderTest { + + @Test + public void encodeResponse() throws Exception { + RecordId recordId = mockRecordId(1, 2, 8); + + EmbeddedChannel channel = new EmbeddedChannel(new GetHeadResponseEncoder()); + channel.writeOutbound(new GetHeadResponse("clientId", recordId)); + ByteBuf buffer = (ByteBuf) channel.readOutbound(); + + ByteBuf expected = Unpooled.buffer(); + expected.writeInt(recordId.toString().getBytes(Charsets.UTF_8).length + 1); + expected.writeByte(Messages.HEADER_RECORD); + expected.writeBytes(recordId.toString().getBytes(Charsets.UTF_8)); + assertEquals(expected, buffer); + } + +} Property changes on: src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponseEncoderTest.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequestHandlerTest.java =================================================================== --- src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequestHandlerTest.java (revision 0) +++ src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequestHandlerTest.java (working copy) @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.oak.segment.standby.server; + +import static org.apache.jackrabbit.oak.segment.standby.server.ServerTestUtils.mockSegment; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.UUID; + +import io.netty.channel.embedded.EmbeddedChannel; +import org.apache.jackrabbit.oak.segment.Segment; +import org.junit.Test; + +public class GetSegmentRequestHandlerTest { + + @Test + public void successfulReadsShouldGenerateResponses() throws Exception { + UUID uuid = new UUID(1, 2); + + Segment segment = mockSegment(uuid, new byte[] {3, 4, 5}); + + StandbySegmentReader reader = mock(StandbySegmentReader.class); + when(reader.readSegment(uuid)).thenReturn(segment); + + EmbeddedChannel channel = new EmbeddedChannel(new GetSegmentRequestHandler(reader)); + channel.writeInbound(new GetSegmentRequest("clientId", uuid)); + GetSegmentResponse response = (GetSegmentResponse) channel.readOutbound(); + assertEquals("clientId", response.getClientId()); + assertSame(segment, response.getSegment()); + } + + @Test + public void unsuccessfulReadsShouldBeDiscarded() throws Exception { + UUID uuid = new UUID(1, 2); + + StandbySegmentReader reader = mock(StandbySegmentReader.class); + when(reader.readSegment(uuid)).thenReturn(null); + + EmbeddedChannel channel = new EmbeddedChannel(new GetSegmentRequestHandler(reader)); + channel.writeInbound(new GetSegmentRequest("clientId", uuid)); + assertNull(channel.readOutbound()); + } + + @Test + public void unrecognizedMessagesShouldBeIgnored() throws Exception { + StandbySegmentReader reader = mock(StandbySegmentReader.class); + EmbeddedChannel channel = new EmbeddedChannel(new GetSegmentRequestHandler(reader)); + channel.writeInbound("unrecognized"); + assertEquals("unrecognized", channel.readInbound()); + } + +} Property changes on: src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequestHandlerTest.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponseEncoderTest.java =================================================================== --- src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponseEncoderTest.java (revision 0) +++ src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponseEncoderTest.java (working copy) @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.oak.segment.standby.server; + +import static org.apache.jackrabbit.oak.segment.standby.server.ServerTestUtils.hash; +import static org.apache.jackrabbit.oak.segment.standby.server.ServerTestUtils.mockSegment; +import static org.junit.Assert.assertEquals; + +import java.util.UUID; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.embedded.EmbeddedChannel; +import org.apache.jackrabbit.oak.segment.Segment; +import org.apache.jackrabbit.oak.segment.standby.codec.Messages; +import org.junit.Test; + +public class GetSegmentResponseEncoderTest { + + @Test + public void encodeResponse() throws Exception { + UUID uuid = new UUID(1, 2); + byte[] data = new byte[] {3, 4, 5}; + Segment segment = mockSegment(uuid, data); + + EmbeddedChannel channel = new EmbeddedChannel(new GetSegmentResponseEncoder()); + channel.writeOutbound(new GetSegmentResponse("clientId", segment)); + ByteBuf buffer = (ByteBuf) channel.readOutbound(); + + ByteBuf expected = Unpooled.buffer(); + expected.writeInt(data.length + 25); + expected.writeByte(Messages.HEADER_SEGMENT); + expected.writeLong(uuid.getMostSignificantBits()); + expected.writeLong(uuid.getLeastSignificantBits()); + expected.writeLong(hash(data)); + expected.writeBytes(data); + + assertEquals(expected, buffer); + } + +} Property changes on: src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponseEncoderTest.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: src/test/java/org/apache/jackrabbit/oak/segment/standby/server/RequestDecoderTest.java =================================================================== --- src/test/java/org/apache/jackrabbit/oak/segment/standby/server/RequestDecoderTest.java (revision 1760352) +++ src/test/java/org/apache/jackrabbit/oak/segment/standby/server/RequestDecoderTest.java (working copy) @@ -20,6 +20,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import java.util.UUID; + import io.netty.channel.embedded.EmbeddedChannel; import org.apache.jackrabbit.oak.segment.standby.codec.Messages; import org.junit.Test; @@ -37,10 +39,10 @@ @Test public void shouldDecodeGetSegmentRequests() throws Exception { EmbeddedChannel channel = new EmbeddedChannel(new RequestDecoder()); - channel.writeInbound(Messages.newGetSegmentRequest("clientId", "segmentId", false)); + channel.writeInbound(Messages.newGetSegmentRequest("clientId", new UUID(1, 2).toString(), false)); GetSegmentRequest request = (GetSegmentRequest) channel.readInbound(); assertEquals("clientId", request.getClientId()); - assertEquals("segmentId", request.getSegmentId()); + assertEquals(new UUID(1, 2), request.getSegmentId()); } @Test Index: src/test/java/org/apache/jackrabbit/oak/segment/standby/server/ServerTestUtils.java =================================================================== --- src/test/java/org/apache/jackrabbit/oak/segment/standby/server/ServerTestUtils.java (revision 0) +++ src/test/java/org/apache/jackrabbit/oak/segment/standby/server/ServerTestUtils.java (working copy) @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.oak.segment.standby.server; + +import static org.mockito.Mockito.mock; + +import java.nio.ByteBuffer; +import java.util.UUID; + +import com.google.common.hash.Hashing; +import org.apache.jackrabbit.oak.segment.RecordId; +import org.apache.jackrabbit.oak.segment.Segment; +import org.apache.jackrabbit.oak.segment.SegmentId; +import org.apache.jackrabbit.oak.segment.SegmentReader; +import org.apache.jackrabbit.oak.segment.SegmentStore; + +class ServerTestUtils { + + private ServerTestUtils() { + // Prevent instantiation. + } + + static RecordId mockRecordId(long msb, long lsb, int offset) { + return new RecordId(new SegmentId(mock(SegmentStore.class), msb, lsb), offset); + } + + static Segment mockSegment(UUID uuid, byte[] buffer) { + SegmentStore store = mock(SegmentStore.class); + SegmentReader reader = mock(SegmentReader.class); + long msb = uuid.getMostSignificantBits(); + long lsb = uuid.getLeastSignificantBits(); + SegmentId id = new SegmentId(store, msb, lsb); + ByteBuffer data = ByteBuffer.wrap(buffer); + return new Segment(store, reader, id, data); + } + + static long hash(byte[] data) { + return Hashing.murmur3_32().newHasher().putBytes(data).hash().padToLong(); + } + +} Property changes on: src/test/java/org/apache/jackrabbit/oak/segment/standby/server/ServerTestUtils.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property