diff --git src/main/java/org/apache/hadoop/hbase/DeserializationException.java src/main/java/org/apache/hadoop/hbase/DeserializationException.java new file mode 100644 index 0000000..c062f20 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/DeserializationException.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +/** + * Failed deserialization + */ +@SuppressWarnings("serial") +public class DeserializationException extends HBaseException { + public DeserializationException() { + super(); + } + + public DeserializationException(final String message) { + super(message); + } + + public DeserializationException(final String message, final Throwable t) { + super(message, t); + } + + public DeserializationException(final Throwable t) { + super(t); + } +} \ No newline at end of file diff --git src/main/java/org/apache/hadoop/hbase/EmptyWatcher.java src/main/java/org/apache/hadoop/hbase/EmptyWatcher.java deleted file mode 100644 index 9881ec2..0000000 --- src/main/java/org/apache/hadoop/hbase/EmptyWatcher.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Copyright 2009 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.WatchedEvent; - -/** - * An empty ZooKeeper watcher - */ -@InterfaceAudience.Private -public class EmptyWatcher implements Watcher { - public static EmptyWatcher instance = new EmptyWatcher(); - private EmptyWatcher() {} - - public void process(WatchedEvent event) {} -} diff --git src/main/java/org/apache/hadoop/hbase/HBaseException.java src/main/java/org/apache/hadoop/hbase/HBaseException.java new file mode 100644 index 0000000..ec0f108 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/HBaseException.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +/** + * Base checked exception in HBase. + * @see https://issues.apache.org/jira/browse/HBASE-5796 + */ +@SuppressWarnings("serial") +public class HBaseException extends Exception { + public HBaseException() { + super(); + } + + public HBaseException(final String message) { + super(message); + } + + public HBaseException(final String message, final Throwable t) { + super(message, t); + } + + public HBaseException(final Throwable t) { + super(t); + } +} \ No newline at end of file diff --git src/main/java/org/apache/hadoop/hbase/RegionTransition.java src/main/java/org/apache/hadoop/hbase/RegionTransition.java new file mode 100644 index 0000000..1471d4e --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/RegionTransition.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.executor.EventHandler; +import org.apache.hadoop.hbase.executor.EventHandler.EventType; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; +import org.apache.hadoop.hbase.util.Bytes; + +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; + +/** + * Current state of a region in transition. Holds state of a region as it moves through the + * steps that take it from offline to open, etc. Used by regionserver, master, and zk packages. + * Encapsulates protobuf serialization/deserialization so we don't leak generated pb outside this + * package. + *

Immutable + */ +@InterfaceAudience.Private +public class RegionTransition { + private final ZooKeeperProtos.RegionTransition rt; + + RegionTransition(final ZooKeeperProtos.RegionTransition rt) { + this.rt = rt; + } + + public EventHandler.EventType getEventType() { + return EventHandler.EventType.get(this.rt.getEventTypeCode()); + } + + public ServerName getServerName() { + return ProtobufUtil.toServerName(this.rt.getOriginServerName()); + } + + public long getCreateTime() { + return this.rt.getCreateTime(); + } + + public byte [] getRegionName() { + return this.rt.getRegionName().toByteArray(); + } + + public byte [] getPayload() { + return this.rt.getPayload().toByteArray(); + } + + @Override + public String toString() { + byte [] payload = getPayload(); + return "region=" + Bytes.toStringBinary(getRegionName()) + ", state=" + getEventType() + + ", servername=" + getServerName() + ", createTime=" + this.getCreateTime() + + ", payload.length=" + (payload == null? 0: payload.length); + } + + /** + * @param type + * @param regionName + * @param sn + * @return a serialized pb {@link RegionTransition} + * @see #parseRegionTransition(byte[]) + */ + public static RegionTransition createRegionTransition(final EventType type, + final byte [] regionName, final ServerName sn) { + return createRegionTransition(type, regionName, sn, null); + } + + /** + * @param type + * @param regionName + * @param sn + * @param payload May be null + * @return a serialized pb {@link RegionTransition} + * @see #parseRegionTransition(byte[]) + */ + public static RegionTransition createRegionTransition(final EventType type, + final byte [] regionName, final ServerName sn, final byte [] payload) { + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName pbsn = + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.newBuilder(). + setHostName(sn.getHostname()).setPort(sn.getPort()).setStartCode(sn.getStartcode()).build(); + ZooKeeperProtos.RegionTransition.Builder builder = ZooKeeperProtos.RegionTransition.newBuilder(). + setEventTypeCode(type.getCode()).setRegionName(ByteString.copyFrom(regionName)). + setOriginServerName(pbsn); + if (payload != null) builder.setPayload(ByteString.copyFrom(payload)); + return new RegionTransition(builder.build()); + } + + /** + * @param data Serialized date to parse. + * @return A RegionTransition instance made of the passed data + * @throws DeserializationException + * @see #toByteArray() + */ + public static RegionTransition parseFrom(final byte [] data) throws DeserializationException { + ProtobufUtil.expectPBMagicPrefix(data); + try { + int prefixLen = ProtobufUtil.lengthOfPBMagic(); + ZooKeeperProtos.RegionTransition rt = ZooKeeperProtos.RegionTransition.newBuilder(). + mergeFrom(data, prefixLen, data.length - prefixLen).build(); + return new RegionTransition(rt); + } catch (InvalidProtocolBufferException e) { + throw new DeserializationException(e); + } + } + + /** + * @return This instance serialized into a byte array + * @see #parseFrom(byte[]) + */ + public byte [] toByteArray() { + return ProtobufUtil.prependPBMagic(this.rt.toByteArray()); + } +} \ No newline at end of file diff --git src/main/java/org/apache/hadoop/hbase/ServerName.java src/main/java/org/apache/hadoop/hbase/ServerName.java index 8fdb624..1a40153 100644 --- src/main/java/org/apache/hadoop/hbase/ServerName.java +++ src/main/java/org/apache/hadoop/hbase/ServerName.java @@ -24,9 +24,13 @@ import java.util.regex.Pattern; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RootRegionServer; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; +import com.google.protobuf.InvalidProtocolBufferException; + /** * Instance of an HBase ServerName. * A server name is used uniquely identifying a server instance and is made @@ -296,4 +300,47 @@ public class ServerName implements Comparable { return SERVERNAME_PATTERN.matcher(str).matches()? new ServerName(str): new ServerName(str, NON_STARTCODE); } -} + + /** + * Get a ServerName from the passed in data bytes. + * @param data Data with a serialize server name in it; can handle the old style + * servername where servername was host and port. Works too with data that + * begins w/ the pb 'PBUF' magic and that is then followed by a protobuf that + * has a serialized {@link ServerName} in it. + * @return Returns null if data is null else converts passed data + * to a ServerName instance. + * @throws DeserializationException + */ + public static ServerName parseFrom(final byte [] data) throws DeserializationException { + if (data == null || data.length <= 0) return null; + if (ProtobufUtil.isPBMagicPrefix(data)) { + int prefixLen = ProtobufUtil.lengthOfPBMagic(); + try { + RootRegionServer rss = + RootRegionServer.newBuilder().mergeFrom(data, prefixLen, data.length - prefixLen).build(); + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName sn = rss.getServer(); + return new ServerName(sn.getHostName(), sn.getPort(), sn.getStartCode()); + } catch (InvalidProtocolBufferException e) { + // A failed parse of the znode is pretty catastrophic. Rather than loop + // retrying hoping the bad bytes will changes, and rather than change + // the signature on this method to add an IOE which will send ripples all + // over the code base, throw a RuntimeException. This should "never" happen. + // Fail fast if it does. + throw new DeserializationException(e); + } + } + // The str returned could be old style -- pre hbase-1502 -- which was + // hostname and port seperated by a colon rather than hostname, port and + // startcode delimited by a ','. + String str = Bytes.toString(data); + int index = str.indexOf(ServerName.SERVERNAME_SEPARATOR); + if (index != -1) { + // Presume its ServerName serialized with versioned bytes. + return ServerName.parseVersionedServerName(data); + } + // Presume it a hostname:port format. + String hostname = Addressing.parseHostname(str); + int port = Addressing.parsePort(str); + return new ServerName(hostname, port, -1L); + } +} \ No newline at end of file diff --git src/main/java/org/apache/hadoop/hbase/SplitLogCounters.java src/main/java/org/apache/hadoop/hbase/SplitLogCounters.java new file mode 100644 index 0000000..45e385c --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/SplitLogCounters.java @@ -0,0 +1,102 @@ +package org.apache.hadoop.hbase; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import java.lang.reflect.Field; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Counters kept by the distributed WAL split log process. + * Used by master and regionserver packages. + */ +@InterfaceAudience.Private +public class SplitLogCounters { + //SplitLogManager counters + public static AtomicLong tot_mgr_log_split_batch_start = new AtomicLong(0); + public static AtomicLong tot_mgr_log_split_batch_success = + new AtomicLong(0); + public static AtomicLong tot_mgr_log_split_batch_err = new AtomicLong(0); + public static AtomicLong tot_mgr_new_unexpected_hlogs = new AtomicLong(0); + public static AtomicLong tot_mgr_log_split_start = new AtomicLong(0); + public static AtomicLong tot_mgr_log_split_success = new AtomicLong(0); + public static AtomicLong tot_mgr_log_split_err = new AtomicLong(0); + public static AtomicLong tot_mgr_node_create_queued = new AtomicLong(0); + public static AtomicLong tot_mgr_node_create_result = new AtomicLong(0); + public static AtomicLong tot_mgr_node_already_exists = new AtomicLong(0); + public static AtomicLong tot_mgr_node_create_err = new AtomicLong(0); + public static AtomicLong tot_mgr_node_create_retry = new AtomicLong(0); + public static AtomicLong tot_mgr_get_data_queued = new AtomicLong(0); + public static AtomicLong tot_mgr_get_data_result = new AtomicLong(0); + public static AtomicLong tot_mgr_get_data_nonode = new AtomicLong(0); + public static AtomicLong tot_mgr_get_data_err = new AtomicLong(0); + public static AtomicLong tot_mgr_get_data_retry = new AtomicLong(0); + public static AtomicLong tot_mgr_node_delete_queued = new AtomicLong(0); + public static AtomicLong tot_mgr_node_delete_result = new AtomicLong(0); + public static AtomicLong tot_mgr_node_delete_err = new AtomicLong(0); + public static AtomicLong tot_mgr_resubmit = new AtomicLong(0); + public static AtomicLong tot_mgr_resubmit_failed = new AtomicLong(0); + public static AtomicLong tot_mgr_null_data = new AtomicLong(0); + public static AtomicLong tot_mgr_orphan_task_acquired = new AtomicLong(0); + public static AtomicLong tot_mgr_wait_for_zk_delete = new AtomicLong(0); + public static AtomicLong tot_mgr_unacquired_orphan_done = new AtomicLong(0); + public static AtomicLong tot_mgr_resubmit_threshold_reached = + new AtomicLong(0); + public static AtomicLong tot_mgr_missing_state_in_delete = + new AtomicLong(0); + public static AtomicLong tot_mgr_heartbeat = new AtomicLong(0); + public static AtomicLong tot_mgr_rescan = new AtomicLong(0); + public static AtomicLong tot_mgr_rescan_deleted = new AtomicLong(0); + public static AtomicLong tot_mgr_task_deleted = new AtomicLong(0); + public static AtomicLong tot_mgr_resubmit_unassigned = new AtomicLong(0); + public static AtomicLong tot_mgr_relist_logdir = new AtomicLong(0); + public static AtomicLong tot_mgr_resubmit_dead_server_task = + new AtomicLong(0); + + // SplitLogWorker counters + public static AtomicLong tot_wkr_failed_to_grab_task_no_data = + new AtomicLong(0); + public static AtomicLong tot_wkr_failed_to_grab_task_exception = + new AtomicLong(0); + public static AtomicLong tot_wkr_failed_to_grab_task_owned = + new AtomicLong(0); + public static AtomicLong tot_wkr_failed_to_grab_task_lost_race = + new AtomicLong(0); + public static AtomicLong tot_wkr_task_acquired = new AtomicLong(0); + public static AtomicLong tot_wkr_task_resigned = new AtomicLong(0); + public static AtomicLong tot_wkr_task_done = new AtomicLong(0); + public static AtomicLong tot_wkr_task_err = new AtomicLong(0); + public static AtomicLong tot_wkr_task_heartbeat = new AtomicLong(0); + public static AtomicLong tot_wkr_task_acquired_rescan = new AtomicLong(0); + public static AtomicLong tot_wkr_get_data_queued = new AtomicLong(0); + public static AtomicLong tot_wkr_get_data_result = new AtomicLong(0); + public static AtomicLong tot_wkr_get_data_retry = new AtomicLong(0); + public static AtomicLong tot_wkr_preempt_task = new AtomicLong(0); + public static AtomicLong tot_wkr_task_heartbeat_failed = new AtomicLong(0); + public static AtomicLong tot_wkr_final_transistion_failed = + new AtomicLong(0); + + public static void resetCounters() throws Exception { + Class cl = (new SplitLogCounters()).getClass(); + Field[] flds = cl.getDeclaredFields(); + for (Field fld : flds) { + ((AtomicLong)fld.get(null)).set(0); + } + } +} \ No newline at end of file diff --git src/main/java/org/apache/hadoop/hbase/SplitLogTask.java src/main/java/org/apache/hadoop/hbase/SplitLogTask.java new file mode 100644 index 0000000..1f16e35 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/SplitLogTask.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; + +import com.google.protobuf.InvalidProtocolBufferException; + +/** + * State of a WAL log split during distributed splitting. State is kept up in zookeeper. + * Encapsulates protobuf serialization/deserialization so we don't leak generated pb outside of + * this class. Used by regionserver and master packages. + *

Immutable + */ +@InterfaceAudience.Private +public class SplitLogTask { + private final ServerName originServer; + private final ZooKeeperProtos.SplitLogTask.State state; + + public static class Unassigned extends SplitLogTask { + public Unassigned(final ServerName originServer) { + super(originServer, ZooKeeperProtos.SplitLogTask.State.UNASSIGNED); + } + } + + public static class Owned extends SplitLogTask { + public Owned(final ServerName originServer) { + super(originServer, ZooKeeperProtos.SplitLogTask.State.OWNED); + } + } + + public static class Resigned extends SplitLogTask { + public Resigned(final ServerName originServer) { + super(originServer, ZooKeeperProtos.SplitLogTask.State.RESIGNED); + } + } + + public static class Done extends SplitLogTask { + public Done(final ServerName originServer) { + super(originServer, ZooKeeperProtos.SplitLogTask.State.DONE); + } + } + + public static class Err extends SplitLogTask { + public Err(final ServerName originServer) { + super(originServer, ZooKeeperProtos.SplitLogTask.State.ERR); + } + } + + SplitLogTask(final ServerName originServer, final ZooKeeperProtos.SplitLogTask.State state) { + this.originServer = originServer; + this.state = state; + } + + public boolean isUnassigned(final ServerName sn) { + return this.originServer.equals(sn) && isUnassigned(); + } + + public boolean isUnassigned() { + return this.state == ZooKeeperProtos.SplitLogTask.State.UNASSIGNED; + } + + public boolean isOwned(final ServerName sn) { + return this.originServer.equals(sn) && isOwned(); + } + + public boolean isOwned() { + return this.state == ZooKeeperProtos.SplitLogTask.State.OWNED; + } + + public boolean isResigned(final ServerName sn) { + return this.originServer.equals(sn) && isResigned(); + } + + public boolean isResigned() { + return this.state == ZooKeeperProtos.SplitLogTask.State.RESIGNED; + } + + public boolean isDone(final ServerName sn) { + return this.originServer.equals(sn) && isDone(); + } + + public boolean isDone() { + return this.state == ZooKeeperProtos.SplitLogTask.State.DONE; + } + + public boolean isErr(final ServerName sn) { + return this.originServer.equals(sn) && isErr(); + } + + public boolean isErr() { + return this.state == ZooKeeperProtos.SplitLogTask.State.ERR; + } + + @Override + public String toString() { + return this.state.toString() + " " + this.originServer.toString(); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof SplitLogTask)) return false; + SplitLogTask other = (SplitLogTask)obj; + return other.state.equals(this.state) && other.originServer.equals(this.originServer); + } + + @Override + public int hashCode() { + int hash = 7; + hash = 31 * hash + this.state.hashCode(); + return 31 * hash + this.originServer.hashCode(); + } + + /** + * @param data Serialized date to parse. + * @return An SplitLogTaskState instance made of the passed data + * @throws DeserializationException + * @see #toByteArray() + */ + public static SplitLogTask parseFrom(final byte [] data) throws DeserializationException { + ProtobufUtil.expectPBMagicPrefix(data); + try { + ZooKeeperProtos.SplitLogTask slts = ZooKeeperProtos.SplitLogTask.parseFrom(data); + return new SplitLogTask(ProtobufUtil.toServerName(slts.getServerName()), + slts.getState()); + } catch (InvalidProtocolBufferException e) { + throw new DeserializationException(e); + } + } + + /** + * @return This instance serialized into a byte array + * @see #parseFrom(byte[]) + */ + public byte [] toByteArray() { + // First create a pb ServerName. Then create a ByteString w/ the TaskState + // bytes in it. Finally create a SplitLogTaskState passing in the two + // pbs just created. + HBaseProtos.ServerName snpb = ProtobufUtil.toServerName(this.originServer); + ZooKeeperProtos.SplitLogTask slts = + ZooKeeperProtos.SplitLogTask.newBuilder().setServerName(snpb).setState(this.state).build(); + return ProtobufUtil.prependPBMagic(slts.toByteArray()); + } +} \ No newline at end of file diff --git src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java index 4121508..fef0c58 100644 --- src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java +++ src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java @@ -105,7 +105,7 @@ public abstract class EventHandler implements Runnable, Comparable { public enum EventType { // Messages originating from RS (NOTE: there is NO direct communication from // RS to Master). These are a result of RS updates into ZK. - //RS_ZK_REGION_CLOSING (1), // It is replaced by M_ZK_REGION_CLOSING(HBASE-4739) + // RS_ZK_REGION_CLOSING (1), // It is replaced by M_ZK_REGION_CLOSING(HBASE-4739) RS_ZK_REGION_CLOSED (2), // RS has finished closing a region RS_ZK_REGION_OPENING (3), // RS is in process of opening a region RS_ZK_REGION_OPENED (4), // RS has finished opening a region @@ -140,10 +140,27 @@ public abstract class EventHandler implements Runnable, Comparable { M_SERVER_SHUTDOWN (70), // Master is processing shutdown of a RS M_META_SERVER_SHUTDOWN (72); // Master is processing shutdown of RS hosting a meta region (-ROOT- or .META.). + private final int code; + /** * Constructor */ - EventType(int value) {} + EventType(final int code) { + this.code = code; + } + + public int getCode() { + return this.code; + } + + public static EventType get(final int code) { + // Is this going to be slow? Its used rare but still... + for (EventType et: EventType.values()) { + if (et.getCode() == code) return et; + } + throw new IllegalArgumentException("Unknown code " + code); + } + public boolean isOnlineSchemaChangeSupported() { return ( this.equals(EventType.C_M_ADD_FAMILY) || @@ -258,4 +275,10 @@ public abstract class EventHandler implements Runnable, Comparable { public String getInformativeName() { return this.getClass().toString(); } + + public static void main(String[] args) { + System.out.println(EventType.C_M_ADD_FAMILY.ordinal()); + System.out.println(EventType.C_M_ADD_FAMILY.getCode()); + System.out.println(EventType.get(44)); + } } diff --git src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java index 06ca377..d898f38 100644 --- src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java +++ src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.executor; import java.io.IOException; -import java.io.PrintWriter; import java.io.Writer; import java.lang.management.ThreadInfo; import java.util.List; @@ -30,8 +29,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -40,7 +37,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.executor.EventHandler.EventHandlerListener; -import org.apache.hadoop.hbase.executor.EventHandler.EventType; import org.apache.hadoop.hbase.monitoring.ThreadMonitoring; import com.google.common.collect.Lists; diff --git src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionData.java src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionData.java deleted file mode 100644 index 35d7b70..0000000 --- src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionData.java +++ /dev/null @@ -1,252 +0,0 @@ -/** - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.executor; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.executor.EventHandler.EventType; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Writables; -import org.apache.hadoop.io.Writable; - -/** - * Data serialized into ZooKeeper for region transitions. - */ -@InterfaceAudience.Private -public class RegionTransitionData implements Writable { - /** - * Type of transition event (offline, opening, opened, closing, closed). - * Required. - */ - private EventType eventType; - - /** Region being transitioned. Required. */ - private byte [] regionName; - - /** Server event originated from. Optional. */ - private ServerName origin; - - /** Time the event was created. Required but automatically set. */ - private long stamp; - - private byte [] payload; - - /** - * Writable constructor. Do not use directly. - */ - public RegionTransitionData() {} - - /** - * Construct data for a new region transition event with the specified event - * type and region name. - * - *

Used when the server name is not known (the master is setting it). This - * happens during cluster startup or during failure scenarios. When - * processing a failed regionserver, the master assigns the regions from that - * server to other servers though the region was never 'closed'. During - * master failover, the new master may have regions stuck in transition - * without a destination so may have to set regions offline and generate a new - * assignment. - * - *

Since only the master uses this constructor, the type should always be - * {@link EventType#M_ZK_REGION_OFFLINE}. - * - * @param eventType type of event - * @param regionName name of region as per HRegionInfo#getRegionName() - */ - public RegionTransitionData(EventType eventType, byte [] regionName) { - this(eventType, regionName, null); - } - - /** - * Construct data for a new region transition event with the specified event - * type, region name, and server name. - * - *

Used when the server name is known (a regionserver is setting it). - * - *

Valid types for this constructor are {@link EventType#M_ZK_REGION_CLOSING}, - * {@link EventType#RS_ZK_REGION_CLOSED}, {@link EventType#RS_ZK_REGION_OPENING}, - * {@link EventType#RS_ZK_REGION_SPLITTING}, - * and {@link EventType#RS_ZK_REGION_OPENED}. - * - * @param eventType type of event - * @param regionName name of region as per HRegionInfo#getRegionName() - * @param origin Originating {@link ServerName} - */ - public RegionTransitionData(EventType eventType, byte [] regionName, - final ServerName origin) { - this(eventType, regionName, origin, null); - } - - /** - * Construct data for a new region transition event with the specified event - * type, region name, and server name. - * - *

Used when the server name is known (a regionserver is setting it). - * - *

Valid types for this constructor are {@link EventType#RS_ZK_REGION_SPLIT} - * since SPLIT is only type that currently carries a payload. - * - * @param eventType type of event - * @param regionName name of region as per HRegionInfo#getRegionName() - * @param serverName Originating {@link ServerName} - * @param payload Payload examples include the daughters involved in a - * {@link EventType#RS_ZK_REGION_SPLIT}. Can be null - */ - public RegionTransitionData(EventType eventType, byte [] regionName, - final ServerName serverName, final byte [] payload) { - this.eventType = eventType; - this.stamp = System.currentTimeMillis(); - this.regionName = regionName; - this.origin = serverName; - this.payload = payload; - } - - /** - * Gets the type of region transition event. - * - *

One of: - *

- * @return type of region transition event - */ - public EventType getEventType() { - return eventType; - } - - /** - * Gets the name of the region being transitioned. - * - *

Region name is required so this never returns null. - * @return region name, the result of a call to HRegionInfo#getRegionName() - */ - public byte [] getRegionName() { - return regionName; - } - - /** - * Gets the server the event originated from. If null, this event originated - * from the master. - * - * @return server name of originating regionserver, or null if from master - */ - public ServerName getOrigin() { - return origin; - } - - /** - * Gets the timestamp when this event was created. - * - * @return stamp event was created - */ - public long getStamp() { - return stamp; - } - - /** - * @return Payload if any. - */ - public byte [] getPayload() { - return this.payload; - } - - @Override - public void readFields(DataInput in) throws IOException { - // the event type byte - eventType = EventType.values()[in.readShort()]; - // the timestamp - stamp = in.readLong(); - // the encoded name of the region being transitioned - regionName = Bytes.readByteArray(in); - // remaining fields are optional so prefixed with boolean - // the name of the regionserver sending the data - if (in.readBoolean()) { - byte [] versionedBytes = Bytes.readByteArray(in); - this.origin = ServerName.parseVersionedServerName(versionedBytes); - } - if (in.readBoolean()) { - this.payload = Bytes.readByteArray(in); - } - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeShort(eventType.ordinal()); - out.writeLong(System.currentTimeMillis()); - Bytes.writeByteArray(out, regionName); - // remaining fields are optional so prefixed with boolean - out.writeBoolean(this.origin != null); - if (this.origin != null) { - Bytes.writeByteArray(out, this.origin.getVersionedBytes()); - } - out.writeBoolean(this.payload != null); - if (this.payload != null) { - Bytes.writeByteArray(out, this.payload); - } - } - - /** - * Get the bytes for this instance. Throws a {@link RuntimeException} if - * there is an error deserializing this instance because it represents a code - * bug. - * @return binary representation of this instance - */ - public byte [] getBytes() { - try { - return Writables.getBytes(this); - } catch(IOException e) { - throw new RuntimeException(e); - } - } - - /** - * Get an instance from bytes. Throws a {@link RuntimeException} if - * there is an error serializing this instance from bytes because it - * represents a code bug. - * @param bytes binary representation of this instance - * @return instance of this class - */ - public static RegionTransitionData fromBytes(byte [] bytes) { - try { - RegionTransitionData data = new RegionTransitionData(); - Writables.getWritable(bytes, data); - return data; - } catch(IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public String toString() { - return "region=" + Bytes.toStringBinary(regionName) + ", origin=" + this.origin + - ", state=" + eventType; - } -} diff --git src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java index 47e3bd6..3441e46 100644 --- src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java +++ src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.DeserializationException; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.monitoring.MonitoredTask; @@ -138,7 +139,8 @@ class ActiveMasterManager extends ZooKeeperListener { // Try to become the active master, watch if there is another master. // Write out our ServerName as versioned bytes. try { - String backupZNode = ZKUtil.joinZNode(this.watcher.backupMasterAddressesZNode, this.sn.toString()); + String backupZNode = + ZKUtil.joinZNode(this.watcher.backupMasterAddressesZNode, this.sn.toString()); if (MasterAddressTracker.setMasterAddress(this.watcher, this.watcher.getMasterAddressZNode(), this.sn)) { // If we were a backup master before, delete our ZNode from the backup // master directory since we are the active now @@ -174,7 +176,14 @@ class ActiveMasterManager extends ZooKeeperListener { msg = ("A master was detected, but went down before its address " + "could be read. Attempting to become the next active master"); } else { - ServerName currentMaster = ZKUtil.znodeContentToServerName(bytes); + ServerName currentMaster; + try { + currentMaster = ServerName.parseFrom(bytes); + } catch (DeserializationException e) { + LOG.warn("Failed parse", e); + // Hopefully next time around we won't fail the parse. Dangerous. + continue; + } if (ServerName.isSameHostnameAndPort(currentMaster, this.sn)) { msg = ("Current master has this master's address, " + currentMaster + "; master was restarted? Deleting node."); diff --git src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 7881321..42a615c 100644 --- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -48,10 +48,12 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.DeserializationException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HServerLoad; import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.RegionTransition; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Stoppable; @@ -62,7 +64,6 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.EventHandler.EventType; import org.apache.hadoop.hbase.executor.ExecutorService; -import org.apache.hadoop.hbase.executor.RegionTransitionData; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.master.AssignmentManager.RegionState.State; import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler; @@ -489,29 +490,34 @@ public class AssignmentManager extends ZooKeeperListener { * @throws IOException */ boolean processRegionInTransition(final String encodedRegionName, - final HRegionInfo regionInfo, - final Map>> deadServers) + final HRegionInfo regionInfo, final Map>> deadServers) throws KeeperException, IOException { Stat stat = new Stat(); - RegionTransitionData data = ZKAssign.getDataAndWatch(watcher, - encodedRegionName, stat); + byte [] data = ZKAssign.getDataAndWatch(watcher, encodedRegionName, stat); if (data == null) return false; + RegionTransition rt; + try { + rt = RegionTransition.parseFrom(data); + } catch (DeserializationException e) { + LOG.warn("Failed parse znode data", e); + return false; + } HRegionInfo hri = regionInfo; if (hri == null) { - if ((hri = getHRegionInfo(data)) == null) return false; + if ((hri = getHRegionInfo(rt.getRegionName())) == null) return false; } - processRegionsInTransition(data, hri, deadServers, stat.getVersion()); + processRegionsInTransition(rt, hri, deadServers, stat.getVersion()); return true; } - void processRegionsInTransition(final RegionTransitionData data, - final HRegionInfo regionInfo, - final Map>> deadServers, - int expectedVersion) + void processRegionsInTransition(final RegionTransition rt, final HRegionInfo regionInfo, + final Map>> deadServers, int expectedVersion) throws KeeperException { + EventType et = rt.getEventType(); + // Get ServerName. Could be null. + ServerName sn = rt.getServerName(); String encodedRegionName = regionInfo.getEncodedName(); - LOG.info("Processing region " + regionInfo.getRegionNameAsString() + - " in state " + data.getEventType()); + LOG.info("Processing region " + regionInfo.getRegionNameAsString() + " in state " + et); synchronized (regionsInTransition) { RegionState regionState = regionsInTransition.get(encodedRegionName); if (regionState != null || @@ -519,21 +525,19 @@ public class AssignmentManager extends ZooKeeperListener { // Just return return; } - switch (data.getEventType()) { + switch (et) { case M_ZK_REGION_CLOSING: // If zk node of the region was updated by a live server skip this // region and just add it into RIT. - if (isOnDeadServer(regionInfo, deadServers) && - (data.getOrigin() == null || !serverManager.isServerOnline(data.getOrigin()))) { + if (isOnDeadServer(regionInfo, deadServers) && (sn == null || !isServerOnline(sn))) { // If was on dead server, its closed now. Force to OFFLINE and this // will get it reassigned if appropriate - forceOffline(regionInfo, data); + forceOffline(regionInfo, rt); } else { // Just insert region into RIT. // If this never updates the timeout will trigger new assignment - regionsInTransition.put(encodedRegionName, new RegionState( - regionInfo, RegionState.State.CLOSING, - data.getStamp(), data.getOrigin())); + regionsInTransition.put(encodedRegionName, + getRegionState(regionInfo, RegionState.State.CLOSING, rt)); } failoverProcessedRegions.put(encodedRegionName, regionInfo); break; @@ -541,27 +545,23 @@ public class AssignmentManager extends ZooKeeperListener { case RS_ZK_REGION_CLOSED: case RS_ZK_REGION_FAILED_OPEN: // Region is closed, insert into RIT and handle it - addToRITandCallClose(regionInfo, RegionState.State.CLOSED, data); + addToRITandCallClose(regionInfo, RegionState.State.CLOSED, rt); failoverProcessedRegions.put(encodedRegionName, regionInfo); break; case M_ZK_REGION_OFFLINE: // If zk node of the region was updated by a live server skip this // region and just add it into RIT. - if (isOnDeadServer(regionInfo, deadServers) && - (data.getOrigin() == null || - !serverManager.isServerOnline(data.getOrigin()))) { + if (isOnDeadServer(regionInfo, deadServers) && (sn == null || !isServerOnline(sn))) { // Region is offline, insert into RIT and handle it like a closed - addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, data); - } else if (data.getOrigin() != null && - !serverManager.isServerOnline(data.getOrigin())) { + addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, rt); + } else if (sn != null && !isServerOnline(sn)) { // to handle cases where offline node is created but sendRegionOpen // RPC is not yet sent - addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, data); + addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, rt); } else { - regionsInTransition.put(encodedRegionName, new RegionState( - regionInfo, RegionState.State.PENDING_OPEN, data.getStamp(), data - .getOrigin())); + regionsInTransition.put(encodedRegionName, + getRegionState(regionInfo, RegionState.State.PENDING_OPEN, rt)); } failoverProcessedRegions.put(encodedRegionName, regionInfo); break; @@ -573,9 +573,8 @@ public class AssignmentManager extends ZooKeeperListener { // Just insert region into RIT // If this never updates the timeout will trigger new assignment if (regionInfo.isMetaTable()) { - regionsInTransition.put(encodedRegionName, new RegionState( - regionInfo, RegionState.State.OPENING, data.getStamp(), data - .getOrigin())); + regionsInTransition.put(encodedRegionName, + getRegionState(regionInfo, RegionState.State.OPENING, rt)); // If ROOT or .META. table is waiting for timeout monitor to assign // it may take lot of time when the assignment.timeout.period is // the default value which may be very long. We will not be able @@ -584,17 +583,15 @@ public class AssignmentManager extends ZooKeeperListener { processOpeningState(regionInfo); break; } - regionsInTransition.put(encodedRegionName, new RegionState(regionInfo, - RegionState.State.OPENING, data.getStamp(), data.getOrigin())); + regionsInTransition.put(encodedRegionName, + getRegionState(regionInfo, RegionState.State.OPENING, rt)); failoverProcessedRegions.put(encodedRegionName, regionInfo); break; case RS_ZK_REGION_OPENED: // Region is opened, insert into RIT and handle it - regionsInTransition.put(encodedRegionName, new RegionState( - regionInfo, RegionState.State.OPEN, - data.getStamp(), data.getOrigin())); - ServerName sn = data.getOrigin() == null? null: data.getOrigin(); + regionsInTransition.put(encodedRegionName, + getRegionState(regionInfo, RegionState.State.OPEN, rt)); // sn could be null if this server is no longer online. If // that is the case, just let this RIT timeout; it'll be assigned // to new server then. @@ -605,10 +602,9 @@ public class AssignmentManager extends ZooKeeperListener { } else if (!serverManager.isServerOnline(sn) && (isOnDeadServer(regionInfo, deadServers) || regionInfo.isMetaRegion() || regionInfo.isRootRegion())) { - forceOffline(regionInfo, data); + forceOffline(regionInfo, rt); } else { - new OpenedRegionHandler(master, this, regionInfo, sn, expectedVersion) - .process(); + new OpenedRegionHandler(master, this, regionInfo, sn, expectedVersion).process(); } failoverProcessedRegions.put(encodedRegionName, regionInfo); break; @@ -623,19 +619,18 @@ public class AssignmentManager extends ZooKeeperListener { /** * Put the region hri into an offline state up in zk. * @param hri - * @param oldData + * @param oldRt * @throws KeeperException */ - private void forceOffline(final HRegionInfo hri, - final RegionTransitionData oldData) + private void forceOffline(final HRegionInfo hri, final RegionTransition oldRt) throws KeeperException { // If was on dead server, its closed now. Force to OFFLINE and then // handle it like a close; this will get it reassigned if appropriate - LOG.debug("RIT " + hri.getEncodedName() + " in state=" + - oldData.getEventType() + " was on deadserver; forcing offline"); + LOG.debug("RIT " + hri.getEncodedName() + " in state=" + oldRt.getEventType() + + " was on deadserver; forcing offline"); ZKAssign.createOrForceNodeOffline(this.watcher, hri, this.master.getServerName()); - addToRITandCallClose(hri, RegionState.State.OFFLINE, oldData); + addToRITandCallClose(hri, RegionState.State.OFFLINE, oldRt); } /** @@ -646,13 +641,23 @@ public class AssignmentManager extends ZooKeeperListener { * @param oldData */ private void addToRITandCallClose(final HRegionInfo hri, - final RegionState.State state, final RegionTransitionData oldData) { - this.regionsInTransition.put(hri.getEncodedName(), - new RegionState(hri, state, oldData.getStamp(), oldData.getOrigin())); + final RegionState.State state, final RegionTransition oldData) { + this.regionsInTransition.put(hri.getEncodedName(), getRegionState(hri, state, oldData)); new ClosedRegionHandler(this.master, this, hri).process(); } /** + * @param hri + * @param state + * @param rt + * @return A new {@link RegionState} instance made of the passed arguments + */ + RegionState getRegionState(final HRegionInfo hri, final RegionState.State state, + final RegionTransition rt) { + return new RegionState(hri, state, rt.getCreateTime(), rt.getServerName()); + } + + /** * When a region is closed, it should be removed from the regionsToReopen * @param hri HRegionInfo of the region which was closed */ @@ -689,41 +694,46 @@ public class AssignmentManager extends ZooKeeperListener { *

* This deals with skipped transitions (we got a CLOSED but didn't see CLOSING * yet). - * @param data + * @param rt * @param expectedVersion */ - private void handleRegion(final RegionTransitionData data, int expectedVersion) { + private void handleRegion(final RegionTransition rt, int expectedVersion) { synchronized(regionsInTransition) { HRegionInfo hri = null; - if (data == null || data.getOrigin() == null) { - LOG.warn("Unexpected NULL input " + data); + if (rt == null) { + LOG.warn("Unexpected NULL input " + rt); + return; + } + final ServerName sn = rt.getServerName(); + if (sn == null) { + LOG.warn("Null servername: " + rt); return; } - ServerName sn = data.getOrigin(); // Check if this is a special HBCK transition if (sn.equals(HConstants.HBCK_CODE_SERVERNAME)) { - handleHBCK(data); + handleHBCK(rt); return; } - String encodedName = HRegionInfo.encodeRegionName(data.getRegionName()); + final long createTime = rt.getCreateTime(); + final byte [] regionName = rt.getRegionName(); + String encodedName = HRegionInfo.encodeRegionName(regionName); String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName); // Verify this is a known server if (!serverManager.isServerOnline(sn) && !this.master.getServerName().equals(sn) - && !ignoreStatesRSOffline.contains(data.getEventType())) { + && !ignoreStatesRSOffline.contains(rt.getEventType())) { LOG.warn("Attempted to handle region transition for server but " + "server is not online: " + prettyPrintedRegionName); return; } // Printing if the event was created a long time ago helps debugging - boolean lateEvent = data.getStamp() < - (System.currentTimeMillis() - 15000); - LOG.debug("Handling transition=" + data.getEventType() + - ", server=" + data.getOrigin() + ", region=" + + boolean lateEvent = createTime < (System.currentTimeMillis() - 15000); + LOG.debug("Handling transition=" + rt.getEventType() + + ", server=" + sn + ", region=" + (prettyPrintedRegionName == null? "null": prettyPrintedRegionName) + (lateEvent? ", which is more than 15 seconds late" : "")); RegionState regionState = regionsInTransition.get(encodedName); - switch (data.getEventType()) { + switch (rt.getEventType()) { case M_ZK_REGION_OFFLINE: // Nothing to do. break; @@ -751,7 +761,7 @@ public class AssignmentManager extends ZooKeeperListener { " but region was not first in SPLITTING state; continuing"); } // Check it has daughters. - byte [] payload = data.getPayload(); + byte [] payload = rt.getPayload(); List daughters = null; try { daughters = Writables.getHRegionInfos(payload, 0, payload.length); @@ -772,10 +782,9 @@ public class AssignmentManager extends ZooKeeperListener { break; case M_ZK_REGION_CLOSING: - hri = checkIfInFailover(regionState, encodedName, data); + hri = checkIfInFailover(regionState, encodedName, regionName); if (hri != null) { - regionState = new RegionState(hri, RegionState.State.CLOSING, data - .getStamp(), data.getOrigin()); + regionState = new RegionState(hri, RegionState.State.CLOSING, createTime, sn); regionsInTransition.put(encodedName, regionState); failoverProcessedRegions.put(encodedName, hri); break; @@ -785,21 +794,19 @@ public class AssignmentManager extends ZooKeeperListener { if (regionState == null || (!regionState.isPendingClose() && !regionState.isClosing())) { LOG.warn("Received CLOSING for region " + prettyPrintedRegionName + - " from server " + data.getOrigin() + " but region was in " + + " from server " + sn + " but region was in " + " the state " + regionState + " and not " + "in expected PENDING_CLOSE or CLOSING states"); return; } // Transition to CLOSING (or update stamp if already CLOSING) - regionState.update(RegionState.State.CLOSING, - data.getStamp(), data.getOrigin()); + regionState.update(RegionState.State.CLOSING, createTime, sn); break; case RS_ZK_REGION_CLOSED: - hri = checkIfInFailover(regionState, encodedName, data); + hri = checkIfInFailover(regionState, encodedName, regionName); if (hri != null) { - regionState = new RegionState(hri, RegionState.State.CLOSED, data - .getStamp(), data.getOrigin()); + regionState = new RegionState(hri, RegionState.State.CLOSED, createTime, sn); regionsInTransition.put(encodedName, regionState); removeClosedRegion(regionState.getRegion()); new ClosedRegionHandler(master, this, regionState.getRegion()) @@ -811,7 +818,7 @@ public class AssignmentManager extends ZooKeeperListener { if (regionState == null || (!regionState.isPendingClose() && !regionState.isClosing())) { LOG.warn("Received CLOSED for region " + prettyPrintedRegionName + - " from server " + data.getOrigin() + " but region was in " + + " from server " + sn + " but region was in " + " the state " + regionState + " and not " + "in expected PENDING_CLOSE or CLOSING states"); return; @@ -819,18 +826,16 @@ public class AssignmentManager extends ZooKeeperListener { // Handle CLOSED by assigning elsewhere or stopping if a disable // If we got here all is good. Need to update RegionState -- else // what follows will fail because not in expected state. - regionState.update(RegionState.State.CLOSED, - data.getStamp(), data.getOrigin()); + regionState.update(RegionState.State.CLOSED, createTime, sn); removeClosedRegion(regionState.getRegion()); this.executorService.submit(new ClosedRegionHandler(master, this, regionState.getRegion())); break; case RS_ZK_REGION_FAILED_OPEN: - hri = checkIfInFailover(regionState, encodedName, data); + hri = checkIfInFailover(regionState, encodedName, regionName); if (hri != null) { - regionState = new RegionState(hri, RegionState.State.CLOSED, data - .getStamp(), data.getOrigin()); + regionState = new RegionState(hri, RegionState.State.CLOSED, createTime, sn); regionsInTransition.put(encodedName, regionState); new ClosedRegionHandler(master, this, regionState.getRegion()) .process(); @@ -840,22 +845,20 @@ public class AssignmentManager extends ZooKeeperListener { if (regionState == null || (!regionState.isPendingOpen() && !regionState.isOpening())) { LOG.warn("Received FAILED_OPEN for region " + prettyPrintedRegionName + - " from server " + data.getOrigin() + " but region was in " + + " from server " + sn + " but region was in " + " the state " + regionState + " and not in PENDING_OPEN or OPENING"); return; } // Handle this the same as if it were opened and then closed. - regionState.update(RegionState.State.CLOSED, - data.getStamp(), data.getOrigin()); + regionState.update(RegionState.State.CLOSED, createTime, sn); this.executorService.submit(new ClosedRegionHandler(master, this, regionState.getRegion())); break; case RS_ZK_REGION_OPENING: - hri = checkIfInFailover(regionState, encodedName, data); + hri = checkIfInFailover(regionState, encodedName, regionName); if (hri != null) { - regionState = new RegionState(hri, RegionState.State.OPENING, data - .getStamp(), data.getOrigin()); + regionState = new RegionState(hri, RegionState.State.OPENING, createTime, sn); regionsInTransition.put(encodedName, regionState); failoverProcessedRegions.put(encodedName, hri); break; @@ -866,24 +869,21 @@ public class AssignmentManager extends ZooKeeperListener { (!regionState.isPendingOpen() && !regionState.isOpening())) { LOG.warn("Received OPENING for region " + prettyPrintedRegionName + - " from server " + data.getOrigin() + " but region was in " + + " from server " + sn + " but region was in " + " the state " + regionState + " and not " + "in expected PENDING_OPEN or OPENING states"); return; } // Transition to OPENING (or update stamp if already OPENING) - regionState.update(RegionState.State.OPENING, - data.getStamp(), data.getOrigin()); + regionState.update(RegionState.State.OPENING, createTime, sn); break; case RS_ZK_REGION_OPENED: - hri = checkIfInFailover(regionState, encodedName, data); + hri = checkIfInFailover(regionState, encodedName, regionName); if (hri != null) { - regionState = new RegionState(hri, RegionState.State.OPEN, data - .getStamp(), data.getOrigin()); + regionState = new RegionState(hri, RegionState.State.OPEN, createTime, sn); regionsInTransition.put(encodedName, regionState); - new OpenedRegionHandler(master, this, regionState.getRegion(), data - .getOrigin(), expectedVersion).process(); + new OpenedRegionHandler(master, this, regionState.getRegion(), sn, expectedVersion).process(); failoverProcessedRegions.put(encodedName, hri); break; } @@ -892,17 +892,15 @@ public class AssignmentManager extends ZooKeeperListener { (!regionState.isPendingOpen() && !regionState.isOpening())) { LOG.warn("Received OPENED for region " + prettyPrintedRegionName + - " from server " + data.getOrigin() + " but region was in " + + " from server " + sn + " but region was in " + " the state " + regionState + " and not " + "in expected PENDING_OPEN or OPENING states"); return; } // Handle OPENED by removing from transition and deleted zk node - regionState.update(RegionState.State.OPEN, - data.getStamp(), data.getOrigin()); + regionState.update(RegionState.State.OPEN, createTime, sn); this.executorService.submit( - new OpenedRegionHandler(master, this, regionState.getRegion(), - data.getOrigin(), expectedVersion)); + new OpenedRegionHandler(master, this, regionState.getRegion(), sn, expectedVersion)); break; default: @@ -920,12 +918,12 @@ public class AssignmentManager extends ZooKeeperListener { * @return hri */ private HRegionInfo checkIfInFailover(RegionState regionState, - String encodedName, RegionTransitionData data) { + String encodedName, final byte [] regionName) { if (regionState == null && this.failover && (failoverProcessedRegions.containsKey(encodedName) == false || failoverProcessedRegions.get(encodedName) == null)) { HRegionInfo hri = this.failoverProcessedRegions.get(encodedName); - if (hri == null) hri = getHRegionInfo(data); + if (hri == null) hri = getHRegionInfo(regionName); return hri; } return null; @@ -933,18 +931,18 @@ public class AssignmentManager extends ZooKeeperListener { /** * Gets the HRegionInfo from the META table - * @param data + * @param regionName * @return HRegionInfo hri for the region */ - private HRegionInfo getHRegionInfo(RegionTransitionData data) { + private HRegionInfo getHRegionInfo(final byte [] regionName) { Pair p = null; try { - p = MetaReader.getRegion(catalogTracker, data.getRegionName()); + p = MetaReader.getRegion(catalogTracker, regionName); if (p == null) return null; return p.getFirst(); } catch (IOException e) { - master.abort("Aborting because error occoured while reading " - + Bytes.toStringBinary(data.getRegionName()) + " from .META.", e); + master.abort("Aborting because error occoured while reading " + + Bytes.toStringBinary(regionName) + " from .META.", e); return null; } } @@ -1042,22 +1040,22 @@ public class AssignmentManager extends ZooKeeperListener { * Handle a ZK unassigned node transition triggered by HBCK repair tool. *

* This is handled in a separate code path because it breaks the normal rules. - * @param data + * @param rt */ - private void handleHBCK(RegionTransitionData data) { - String encodedName = HRegionInfo.encodeRegionName(data.getRegionName()); - LOG.info("Handling HBCK triggered transition=" + data.getEventType() + - ", server=" + data.getOrigin() + ", region=" + + private void handleHBCK(RegionTransition rt) { + String encodedName = HRegionInfo.encodeRegionName(rt.getRegionName()); + LOG.info("Handling HBCK triggered transition=" + rt.getEventType() + + ", server=" + rt.getServerName() + ", region=" + HRegionInfo.prettyPrint(encodedName)); RegionState regionState = regionsInTransition.get(encodedName); - switch (data.getEventType()) { + switch (rt.getEventType()) { case M_ZK_REGION_OFFLINE: HRegionInfo regionInfo = null; if (regionState != null) { regionInfo = regionState.getRegion(); } else { try { - byte[] name = data.getRegionName(); + byte [] name = rt.getRegionName(); Pair p = MetaReader.getRegion(catalogTracker, name); regionInfo = p.getFirst(); } catch (IOException e) { @@ -1072,8 +1070,7 @@ public class AssignmentManager extends ZooKeeperListener { break; default: - LOG.warn("Received unexpected region state from HBCK (" + - data.getEventType() + ")"); + LOG.warn("Received unexpected region state from HBCK: " + rt.toString()); break; } } @@ -1094,18 +1091,7 @@ public class AssignmentManager extends ZooKeeperListener { */ @Override public void nodeCreated(String path) { - if(path.startsWith(watcher.assignmentZNode)) { - try { - Stat stat = new Stat(); - RegionTransitionData data = ZKAssign.getDataAndWatch(watcher, path, stat); - if (data == null) { - return; - } - handleRegion(data, stat.getVersion()); - } catch (KeeperException e) { - master.abort("Unexpected ZK exception reading unassigned node data", e); - } - } + handleAssignmentEvent(path); } /** @@ -1122,17 +1108,21 @@ public class AssignmentManager extends ZooKeeperListener { */ @Override public void nodeDataChanged(String path) { - if(path.startsWith(watcher.assignmentZNode)) { - try { - Stat stat = new Stat(); - RegionTransitionData data = ZKAssign.getDataAndWatch(watcher, path, stat); - if (data == null) { - return; - } - handleRegion(data, stat.getVersion()); - } catch (KeeperException e) { - master.abort("Unexpected ZK exception reading unassigned node data", e); - } + handleAssignmentEvent(path); + } + + private void handleAssignmentEvent(final String path) { + if (!path.startsWith(watcher.assignmentZNode)) return; + try { + Stat stat = new Stat(); + byte [] data = ZKAssign.getDataAndWatch(watcher, path, stat); + if (data == null) return; + RegionTransition rt = RegionTransition.parseFrom(data); + handleRegion(rt, stat.getVersion()); + } catch (KeeperException e) { + master.abort("Unexpected ZK exception reading unassigned node data", e); + } catch (DeserializationException e) { + master.abort("Unexpected exception deserializing node data", e); } } @@ -1982,7 +1972,8 @@ public class AssignmentManager extends ZooKeeperListener { + "can't be created."); return; } - } catch (KeeperException e) { + } catch (KeeperException ee) { + Exception e = ee; if (e instanceof NodeExistsException) { // Handle race between master initiated close and regionserver // orchestrated splitting. See if existing node is in a @@ -2003,6 +1994,8 @@ public class AssignmentManager extends ZooKeeperListener { return; } catch (KeeperException ke) { LOG.error("Unexpected zk state", ke); + } catch (DeserializationException de) { + LOG.error("Failed parse", de); } } // If we get here, don't understand whats going on -- abort. @@ -2120,14 +2113,17 @@ public class AssignmentManager extends ZooKeeperListener { * @param path * @return True if znode is in SPLIT or SPLITTING state. * @throws KeeperException Can happen if the znode went away in meantime. + * @throws DeserializationException */ - private boolean isSplitOrSplitting(final String path) throws KeeperException { + private boolean isSplitOrSplitting(final String path) + throws KeeperException, DeserializationException { boolean result = false; // This may fail if the SPLIT or SPLITTING znode gets cleaned up before we // can get data from it. - RegionTransitionData data = ZKAssign.getData(master.getZooKeeper(), path); - EventType evt = data.getEventType(); - switch (evt) { + byte [] data = ZKAssign.getData(master.getZooKeeper(), path); + if (data == null) return false; + RegionTransition rt = RegionTransition.parseFrom(data); + switch (rt.getEventType()) { case RS_ZK_REGION_SPLIT: case RS_ZK_REGION_SPLITTING: result = true; @@ -2654,8 +2650,8 @@ public class AssignmentManager extends ZooKeeperListener { * @throws KeeperException */ private void processDeadServersAndRecoverLostRegions( - Map>> deadServers, - List nodes) throws IOException, KeeperException { + Map>> deadServers, List nodes) + throws IOException, KeeperException { if (null != deadServers) { for (Map.Entry>> deadServer : deadServers.entrySet()) { @@ -2666,17 +2662,25 @@ public class AssignmentManager extends ZooKeeperListener { // If region was in transition (was in zk) force it offline for // reassign try { - RegionTransitionData data = ZKAssign.getData(watcher, - regionInfo.getEncodedName()); + byte [] data = ZKAssign.getData(watcher, regionInfo.getEncodedName()); + if (data == null) { + LOG.warn("No data in znode for " + regionInfo.getEncodedName()); + continue; + } + RegionTransition rt; + try { + rt = RegionTransition.parseFrom(data); + } catch (DeserializationException e) { + LOG.warn("Failed parse of znode data for " + regionInfo.getEncodedName(), e); + continue; + } // If zk node of this region has been updated by a live server, // we consider that this region is being handled. - // So we should skip it and process it in - // processRegionsInTransition. - if (data != null && data.getOrigin() != null && - serverManager.isServerOnline(data.getOrigin())) { - LOG.info("The region " + regionInfo.getEncodedName() - + "is being handled on " + data.getOrigin()); + // So we should skip it and process it in processRegionsInTransition. + ServerName sn = rt.getServerName(); + if (isServerOnline(sn)) { + LOG.info("The region " + regionInfo.getEncodedName() + "is being handled on " + sn); continue; } // Process with existing RS shutdown code @@ -2962,27 +2966,28 @@ public class AssignmentManager extends ZooKeeperListener { try { String node = ZKAssign.getNodeName(watcher, regionInfo.getEncodedName()); Stat stat = new Stat(); - RegionTransitionData dataInZNode = ZKAssign.getDataNoWatch(watcher, node, - stat); - if (dataInZNode == null) { + byte [] data = ZKAssign.getDataNoWatch(watcher, node, stat); + if (data == null) { LOG.warn("Data is null, node " + node + " no longer exists"); return; } - if (dataInZNode.getEventType() == EventType.RS_ZK_REGION_OPENED) { + RegionTransition rt = RegionTransition.parseFrom(data); + EventType et = rt.getEventType(); + if (et == EventType.RS_ZK_REGION_OPENED) { LOG.debug("Region has transitioned to OPENED, allowing " + "watched event handlers to process"); return; - } else if (dataInZNode.getEventType() != EventType.RS_ZK_REGION_OPENING && - dataInZNode.getEventType() != EventType.RS_ZK_REGION_FAILED_OPEN ) { - LOG.warn("While timing out a region in state OPENING, " - + "found ZK node in unexpected state: " - + dataInZNode.getEventType()); + } else if (et != EventType.RS_ZK_REGION_OPENING && et != EventType.RS_ZK_REGION_FAILED_OPEN ) { + LOG.warn("While timing out a region, found ZK node in unexpected state: " + et); return; } invokeAssign(regionInfo); } catch (KeeperException ke) { LOG.error("Unexpected ZK exception timing out CLOSING region", ke); return; + } catch (DeserializationException e) { + LOG.error("Unexpected exception parsing CLOSING region", e); + return; } return; } @@ -3014,16 +3019,18 @@ public class AssignmentManager extends ZooKeeperListener { * @return whether the serverName currently hosts the region */ public boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) { - RegionTransitionData data = null; + RegionTransition rt = null; try { - data = ZKAssign.getData(master.getZooKeeper(), hri.getEncodedName()); + byte [] data = ZKAssign.getData(master.getZooKeeper(), hri.getEncodedName()); + if (data == null) return false; + rt = RegionTransition.parseFrom(data); } catch (KeeperException e) { - master.abort("Unexpected ZK exception reading unassigned node for region=" - + hri.getEncodedName(), e); + master.abort("Exception reading unassigned node for region=" + hri.getEncodedName(), e); + } catch (DeserializationException e) { + master.abort("Exception parsing unassigned node for region=" + hri.getEncodedName(), e); } - - ServerName addressFromZK = (data != null && data.getOrigin() != null) ? - data.getOrigin() : null; + + ServerName addressFromZK = rt.getServerName(); if (addressFromZK != null) { // if we get something from ZK, we will use the data boolean matchZK = (addressFromZK != null && @@ -3043,6 +3050,7 @@ public class AssignmentManager extends ZooKeeperListener { return matchAM; } + /** * Process shutdown server removing any assignments. * @param sn Server that went down. @@ -3386,7 +3394,7 @@ public class AssignmentManager extends ZooKeeperListener { * @return True if online. */ public boolean isServerOnline(ServerName serverName) { - return this.serverManager.isServerOnline(serverName); + return serverName != null && this.serverManager.isServerOnline(serverName); } /** * Shutdown the threadpool executor service diff --git src/main/java/org/apache/hadoop/hbase/master/HMaster.java src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 81e9023..353d564 100644 --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -49,6 +49,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.DeserializationException; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; @@ -1427,7 +1428,7 @@ Server { List backupMasterStrings; try { backupMasterStrings = ZKUtil.listChildrenNoWatch(this.zooKeeper, - this.zooKeeper.backupMasterAddressesZNode); + this.zooKeeper.backupMasterAddressesZNode); } catch (KeeperException e) { LOG.warn(this.zooKeeper.prefix("Unable to list backup servers"), e); backupMasterStrings = new ArrayList(0); @@ -1436,9 +1437,17 @@ Server { backupMasterStrings.size()); for (String s: backupMasterStrings) { try { - byte[] bytes = ZKUtil.getData(this.zooKeeper, ZKUtil.joinZNode(this.zooKeeper.backupMasterAddressesZNode, s)); + byte [] bytes = + ZKUtil.getData(this.zooKeeper, ZKUtil.joinZNode(this.zooKeeper.backupMasterAddressesZNode, s)); if (bytes != null) { - backupMasters.add(ZKUtil.znodeContentToServerName(bytes)); + ServerName sn; + try { + sn = ServerName.parseFrom(bytes); + } catch (DeserializationException e) { + LOG.warn("Failed parse, skipping registering backup server", e); + continue; + } + backupMasters.add(sn); } } catch (KeeperException e) { LOG.warn(this.zooKeeper.prefix("Unable to get information about " + diff --git src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java index 692f194..f203e0d 100644 --- src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java +++ src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java @@ -104,7 +104,7 @@ public class MasterFileSystem { conf.getBoolean(HConstants.DISTRIBUTED_LOG_SPLITTING_KEY, true); if (this.distributedLogSplitting) { this.splitLogManager = new SplitLogManager(master.getZooKeeper(), - master.getConfiguration(), master, master.getServerName().toString()); + master.getConfiguration(), master, master.getServerName()); this.splitLogManager.finishInitialization(); } else { this.splitLogManager = null; diff --git src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index 919c65f..edfff8c 100644 --- src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -1,7 +1,5 @@ /** - * Copyright 2011 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one + * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file @@ -19,14 +17,19 @@ */ package org.apache.hadoop.hbase.master; -import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*; +import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK; +import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE; +import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED; +import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE; +import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS; +import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS; import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; -import java.util.Set; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -38,7 +41,10 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.SplitLogCounters; +import org.apache.hadoop.hbase.DeserializationException; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.SplitLogTask; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status; import org.apache.hadoop.hbase.monitoring.MonitoredTask; @@ -59,36 +65,31 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; -import org.apache.hadoop.hbase.zookeeper.ZKSplitLog.TaskState; - -import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.*; -import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.*; /** * Distributes the task of log splitting to the available region servers. * Coordination happens via zookeeper. For every log file that has to be split a - * znode is created under /hbase/splitlog. SplitLogWorkers race to grab a task. + * znode is created under /hbase/splitlog. SplitLogWorkers race to grab a task. * - * SplitLogManager monitors the task znodes that it creates using the + *

SplitLogManager monitors the task znodes that it creates using the * timeoutMonitor thread. If a task's progress is slow then - * resubmit(String, boolean) will take away the task from the owner - * {@link SplitLogWorker} and the task will be - * upforgrabs again. When the task is done then the task's znode is deleted by - * SplitLogManager. + * {@link #resubmit(String, Task, ResubmitDirective)} will take away the task from the owner + * {@link SplitLogWorker} and the task will be up for grabs again. When the task is done then the + * task's znode is deleted by SplitLogManager. * - * Clients call {@link #splitLogDistributed(Path)} to split a region server's + *

Clients call {@link #splitLogDistributed(Path)} to split a region server's * log files. The caller thread waits in this method until all the log files * have been split. * - * All the zookeeper calls made by this class are asynchronous. This is mainly + *

All the zookeeper calls made by this class are asynchronous. This is mainly * to help reduce response time seen by the callers. * - * There is race in this design between the SplitLogManager and the + *

There is race in this design between the SplitLogManager and the * SplitLogWorker. SplitLogManager might re-queue a task that has in reality * already been completed by a SplitLogWorker. We rely on the idempotency of * the log splitting task for correctness. * - * It is also assumed that every log splitting task is unique and once + *

It is also assumed that every log splitting task is unique and once * completed (either with success or with error) it will be not be submitted * again. If a task is resubmitted then there is a risk that old "delete task" * can delete the re-submission. @@ -97,8 +98,13 @@ import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.* public class SplitLogManager extends ZooKeeperListener { private static final Log LOG = LogFactory.getLog(SplitLogManager.class); + public static final int DEFAULT_TIMEOUT = 25000; // 25 sec + public static final int DEFAULT_ZK_RETRIES = 3; + public static final int DEFAULT_MAX_RESUBMIT = 3; + public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); //3 min + private final Stoppable stopper; - private final String serverName; + private final ServerName serverName; private final TaskFinisher taskFinisher; private FileSystem fs; private Configuration conf; @@ -110,8 +116,7 @@ public class SplitLogManager extends ZooKeeperListener { private long lastNodeCreateTime = Long.MAX_VALUE; public boolean ignoreZKDeleteForTesting = false; - private ConcurrentMap tasks = - new ConcurrentHashMap(); + private ConcurrentMap tasks = new ConcurrentHashMap(); private TimeoutMonitor timeoutMonitor; private Set deadWorkers = null; @@ -130,7 +135,7 @@ public class SplitLogManager extends ZooKeeperListener { * @param serverName */ public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf, - Stoppable stopper, String serverName) { + Stoppable stopper, ServerName serverName) { this(zkw, conf, stopper, serverName, new TaskFinisher() { @Override public Status finish(String workerName, String logfile) { @@ -159,20 +164,16 @@ public class SplitLogManager extends ZooKeeperListener { * @param tf task finisher */ public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf, - Stoppable stopper, String serverName, TaskFinisher tf) { + Stoppable stopper, ServerName serverName, TaskFinisher tf) { super(zkw); this.taskFinisher = tf; this.conf = conf; this.stopper = stopper; - this.zkretries = conf.getLong("hbase.splitlog.zk.retries", - ZKSplitLog.DEFAULT_ZK_RETRIES); - this.resubmit_threshold = conf.getLong("hbase.splitlog.max.resubmit", - ZKSplitLog.DEFAULT_MAX_RESUBMIT); - this.timeout = conf.getInt("hbase.splitlog.manager.timeout", - ZKSplitLog.DEFAULT_TIMEOUT); + this.zkretries = conf.getLong("hbase.splitlog.zk.retries", DEFAULT_ZK_RETRIES); + this.resubmit_threshold = conf.getLong("hbase.splitlog.max.resubmit", DEFAULT_MAX_RESUBMIT); + this.timeout = conf.getInt("hbase.splitlog.manager.timeout", DEFAULT_TIMEOUT); this.unassignedTimeout = - conf.getInt("hbase.splitlog.manager.unassigned.timeout", - ZKSplitLog.DEFAULT_UNASSIGNED_TIMEOUT); + conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT); LOG.debug("timeout = " + timeout); LOG.debug("unassigned timeout = " + unassignedTimeout); @@ -244,7 +245,7 @@ public class SplitLogManager extends ZooKeeperListener { FileStatus[] logfiles = getFileList(logDirs); status.setStatus("Checking directory contents..."); LOG.debug("Scheduling batch of logs to split"); - tot_mgr_log_split_batch_start.incrementAndGet(); + SplitLogCounters.tot_mgr_log_split_batch_start.incrementAndGet(); LOG.info("started splitting logs in " + logDirs); long t = EnvironmentEdgeManager.currentTimeMillis(); long totalSize = 0; @@ -264,7 +265,7 @@ public class SplitLogManager extends ZooKeeperListener { waitForSplittingCompletion(batch, status); if (batch.done != batch.installed) { batch.isDead = true; - tot_mgr_log_split_batch_err.incrementAndGet(); + SplitLogCounters.tot_mgr_log_split_batch_err.incrementAndGet(); LOG.warn("error while splitting logs in " + logDirs + " installed = " + batch.installed + " but only " + batch.done + " done"); throw new IOException("error or interrupt while splitting logs in " @@ -285,7 +286,7 @@ public class SplitLogManager extends ZooKeeperListener { LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe); } } - tot_mgr_log_split_batch_success.incrementAndGet(); + SplitLogCounters.tot_mgr_log_split_batch_success.incrementAndGet(); } String msg = "finished splitting (more than or equal to) " + totalSize + " bytes in " + batch.installed + " log files in " + logDirs + " in " + @@ -303,7 +304,7 @@ public class SplitLogManager extends ZooKeeperListener { * @return true if a new entry is created, false if it is already there. */ boolean enqueueSplitTask(String taskname, TaskBatch batch) { - tot_mgr_log_split_start.incrementAndGet(); + SplitLogCounters.tot_mgr_log_split_start.incrementAndGet(); String path = ZKSplitLog.getEncodedNodeName(watcher, taskname); Task oldtask = createTaskIfAbsent(path, batch); if (oldtask == null) { @@ -340,17 +341,17 @@ public class SplitLogManager extends ZooKeeperListener { Task task = tasks.get(path); if (task == null) { if (!ZKSplitLog.isRescanNode(watcher, path)) { - tot_mgr_unacquired_orphan_done.incrementAndGet(); + SplitLogCounters.tot_mgr_unacquired_orphan_done.incrementAndGet(); LOG.debug("unacquired orphan task is done " + path); } } else { synchronized (task) { if (task.status == IN_PROGRESS) { if (status == SUCCESS) { - tot_mgr_log_split_success.incrementAndGet(); + SplitLogCounters.tot_mgr_log_split_success.incrementAndGet(); LOG.info("Done splitting " + path); } else { - tot_mgr_log_split_err.incrementAndGet(); + SplitLogCounters.tot_mgr_log_split_err.incrementAndGet(); LOG.warn("Error splitting " + path); } task.status = status; @@ -376,10 +377,9 @@ public class SplitLogManager extends ZooKeeperListener { } private void createNode(String path, Long retry_count) { - ZKUtil.asyncCreate(this.watcher, path, - TaskState.TASK_UNASSIGNED.get(serverName), new CreateAsyncCallback(), - retry_count); - tot_mgr_node_create_queued.incrementAndGet(); + SplitLogTask slt = new SplitLogTask.Unassigned(serverName); + ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(), retry_count); + SplitLogCounters.tot_mgr_node_create_queued.incrementAndGet(); return; } @@ -400,7 +400,7 @@ public class SplitLogManager extends ZooKeeperListener { this.watcher.getRecoverableZooKeeper().getZooKeeper(). getData(path, this.watcher, new GetDataAsyncCallback(), retry_count); - tot_mgr_get_data_queued.incrementAndGet(); + SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet(); } private void tryGetDataSetWatch(String path) { @@ -408,37 +408,36 @@ public class SplitLogManager extends ZooKeeperListener { this.watcher.getRecoverableZooKeeper().getZooKeeper(). getData(path, this.watcher, new GetDataAsyncCallback(), Long.valueOf(-1) /* retry count */); - tot_mgr_get_data_queued.incrementAndGet(); + SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet(); } - private void getDataSetWatchSuccess(String path, byte[] data, int version) { + private void getDataSetWatchSuccess(String path, byte[] data, int version) + throws DeserializationException { if (data == null) { if (version == Integer.MIN_VALUE) { // assume all done. The task znode suddenly disappeared. setDone(path, SUCCESS); return; } - tot_mgr_null_data.incrementAndGet(); + SplitLogCounters.tot_mgr_null_data.incrementAndGet(); LOG.fatal("logic error - got null data " + path); setDone(path, FAILURE); return; } data = this.watcher.getRecoverableZooKeeper().removeMetaData(data); - // LOG.debug("set watch on " + path + " got data " + new String(data)); - if (TaskState.TASK_UNASSIGNED.equals(data)) { + SplitLogTask slt = SplitLogTask.parseFrom(data); + if (slt.isUnassigned()) { LOG.debug("task not yet acquired " + path + " ver = " + version); handleUnassignedTask(path); - } else if (TaskState.TASK_OWNED.equals(data)) { - heartbeat(path, version, - TaskState.TASK_OWNED.getWriterName(data)); - } else if (TaskState.TASK_RESIGNED.equals(data)) { + } else if (slt.isOwned()) { + heartbeat(path, version, slt.toString()); + } else if (slt.isResigned()) { LOG.info("task " + path + " entered state " + new String(data)); resubmitOrFail(path, FORCE); - } else if (TaskState.TASK_DONE.equals(data)) { + } else if (slt.isDone()) { LOG.info("task " + path + " entered state " + new String(data)); if (taskFinisher != null && !ZKSplitLog.isRescanNode(watcher, path)) { - if (taskFinisher.finish(TaskState.TASK_DONE.getWriterName(data), - ZKSplitLog.getFileName(path)) == Status.DONE) { + if (taskFinisher.finish(slt.toString(), ZKSplitLog.getFileName(path)) == Status.DONE) { setDone(path, SUCCESS); } else { resubmitOrFail(path, CHECK); @@ -446,12 +445,11 @@ public class SplitLogManager extends ZooKeeperListener { } else { setDone(path, SUCCESS); } - } else if (TaskState.TASK_ERR.equals(data)) { + } else if (slt.isErr()) { LOG.info("task " + path + " entered state " + new String(data)); resubmitOrFail(path, CHECK); } else { - LOG.fatal("logic error - unexpected zk state for path = " + path - + " data = " + new String(data)); + LOG.fatal("logic error - unexpected zk state for path = " + path + " data = " + new String(data)); setDone(path, FAILURE); } } @@ -483,8 +481,7 @@ public class SplitLogManager extends ZooKeeperListener { } } - private void heartbeat(String path, int new_version, - String workerName) { + private void heartbeat(String path, int new_version, String workerName) { Task task = findOrCreateOrphanTask(path); if (new_version != task.last_version) { if (task.isUnassigned()) { @@ -492,7 +489,7 @@ public class SplitLogManager extends ZooKeeperListener { } task.heartbeat(EnvironmentEdgeManager.currentTimeMillis(), new_version, workerName); - tot_mgr_heartbeat.incrementAndGet(); + SplitLogCounters.tot_mgr_heartbeat.incrementAndGet(); } else { // duplicate heartbeats - heartbeats w/o zk node version // changing - are possible. The timeout thread does @@ -502,8 +499,7 @@ public class SplitLogManager extends ZooKeeperListener { return; } - private boolean resubmit(String path, Task task, - ResubmitDirective directive) { + private boolean resubmit(String path, Task task, ResubmitDirective directive) { // its ok if this thread misses the update to task.deleted. It will // fail later if (task.status != IN_PROGRESS) { @@ -518,7 +514,7 @@ public class SplitLogManager extends ZooKeeperListener { if (task.unforcedResubmits >= resubmit_threshold) { if (!task.resubmitThresholdReached) { task.resubmitThresholdReached = true; - tot_mgr_resubmit_threshold_reached.incrementAndGet(); + SplitLogCounters.tot_mgr_resubmit_threshold_reached.incrementAndGet(); LOG.info("Skipping resubmissions of task " + path + " because threshold " + resubmit_threshold + " reached"); } @@ -533,9 +529,8 @@ public class SplitLogManager extends ZooKeeperListener { task.incarnation++; try { // blocking zk call but this is done from the timeout thread - if (ZKUtil.setData(this.watcher, path, - TaskState.TASK_UNASSIGNED.get(serverName), - version) == false) { + SplitLogTask slt = new SplitLogTask.Unassigned(this.serverName); + if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), version) == false) { LOG.debug("failed to resubmit task " + path + " version changed"); task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis()); @@ -544,15 +539,20 @@ public class SplitLogManager extends ZooKeeperListener { } catch (NoNodeException e) { LOG.warn("failed to resubmit because znode doesn't exist " + path + " task done (or forced done by removing the znode)"); - getDataSetWatchSuccess(path, null, Integer.MIN_VALUE); + try { + getDataSetWatchSuccess(path, null, Integer.MIN_VALUE); + } catch (DeserializationException e1) { + LOG.debug("failed to re-resubmit task " + path + " because of deserialization issue"); + task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis()); + return false; + } return false; } catch (KeeperException.BadVersionException e) { - LOG.debug("failed to resubmit task " + path + - " version changed"); + LOG.debug("failed to resubmit task " + path + " version changed"); task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis()); return false; } catch (KeeperException e) { - tot_mgr_resubmit_failed.incrementAndGet(); + SplitLogCounters.tot_mgr_resubmit_failed.incrementAndGet(); LOG.warn("failed to resubmit " + path, e); return false; } @@ -562,7 +562,7 @@ public class SplitLogManager extends ZooKeeperListener { } task.setUnassigned(); createRescanNode(Long.MAX_VALUE); - tot_mgr_resubmit.incrementAndGet(); + SplitLogCounters.tot_mgr_resubmit.incrementAndGet(); return true; } @@ -573,7 +573,7 @@ public class SplitLogManager extends ZooKeeperListener { } private void deleteNode(String path, Long retries) { - tot_mgr_node_delete_queued.incrementAndGet(); + SplitLogCounters.tot_mgr_node_delete_queued.incrementAndGet(); // Once a task znode is ready for delete, that is it is in the TASK_DONE // state, then no one should be writing to it anymore. That is no one // will be updating the znode version any more. @@ -590,9 +590,9 @@ public class SplitLogManager extends ZooKeeperListener { task = tasks.remove(path); if (task == null) { if (ZKSplitLog.isRescanNode(watcher, path)) { - tot_mgr_rescan_deleted.incrementAndGet(); + SplitLogCounters.tot_mgr_rescan_deleted.incrementAndGet(); } - tot_mgr_missing_state_in_delete.incrementAndGet(); + SplitLogCounters.tot_mgr_missing_state_in_delete.incrementAndGet(); LOG.debug("deleted task without in memory state " + path); return; } @@ -600,7 +600,7 @@ public class SplitLogManager extends ZooKeeperListener { task.status = DELETED; task.notify(); } - tot_mgr_task_deleted.incrementAndGet(); + SplitLogCounters.tot_mgr_task_deleted.incrementAndGet(); } private void deleteNodeFailure(String path) { @@ -622,16 +622,16 @@ public class SplitLogManager extends ZooKeeperListener { // might miss the watch-trigger that creation of RESCAN node provides. // Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks // therefore this behavior is safe. + SplitLogTask slt = new SplitLogTask.Done(this.serverName); this.watcher.getRecoverableZooKeeper().getZooKeeper(). - create(ZKSplitLog.getRescanNode(watcher), - TaskState.TASK_DONE.get(serverName), Ids.OPEN_ACL_UNSAFE, - CreateMode.EPHEMERAL_SEQUENTIAL, + create(ZKSplitLog.getRescanNode(watcher), slt.toByteArray(), + Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new CreateRescanAsyncCallback(), Long.valueOf(retries)); } private void createRescanSuccess(String path) { lastNodeCreateTime = EnvironmentEdgeManager.currentTimeMillis(); - tot_mgr_rescan.incrementAndGet(); + SplitLogCounters.tot_mgr_rescan.incrementAndGet(); getDataSetWatch(path, zkretries); } @@ -675,7 +675,7 @@ public class SplitLogManager extends ZooKeeperListener { while (oldtask.status == FAILURE) { LOG.debug("wait for status of task " + path + " to change to DELETED"); - tot_mgr_wait_for_zk_delete.incrementAndGet(); + SplitLogCounters.tot_mgr_wait_for_zk_delete.incrementAndGet(); try { oldtask.wait(); } catch (InterruptedException e) { @@ -713,7 +713,7 @@ public class SplitLogManager extends ZooKeeperListener { task = tasks.putIfAbsent(path, orphanTask); if (task == null) { LOG.info("creating orphan task " + path); - tot_mgr_orphan_task_acquired.incrementAndGet(); + SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet(); task = orphanTask; } return task; @@ -905,7 +905,7 @@ public class SplitLogManager extends ZooKeeperListener { } found_assigned_task = true; if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) { - tot_mgr_resubmit_dead_server_task.incrementAndGet(); + SplitLogCounters.tot_mgr_resubmit_dead_server_task.incrementAndGet(); if (resubmit(path, task, FORCE)) { resubmitted++; } else { @@ -948,7 +948,7 @@ public class SplitLogManager extends ZooKeeperListener { } } createRescanNode(Long.MAX_VALUE); - tot_mgr_resubmit_unassigned.incrementAndGet(); + SplitLogCounters.tot_mgr_resubmit_unassigned.incrementAndGet(); LOG.debug("resubmitting unassigned task(s) after timeout"); } } @@ -963,7 +963,7 @@ public class SplitLogManager extends ZooKeeperListener { @Override public void processResult(int rc, String path, Object ctx, String name) { - tot_mgr_node_create_result.incrementAndGet(); + SplitLogCounters.tot_mgr_node_create_result.incrementAndGet(); if (rc != 0) { if (rc == KeeperException.Code.NODEEXISTS.intValue()) { // What if there is a delete pending against this pre-existing @@ -973,16 +973,16 @@ public class SplitLogManager extends ZooKeeperListener { // And all code pieces correctly handle the case of suddenly // disappearing task-znode. LOG.debug("found pre-existing znode " + path); - tot_mgr_node_already_exists.incrementAndGet(); + SplitLogCounters.tot_mgr_node_already_exists.incrementAndGet(); } else { Long retry_count = (Long)ctx; LOG.warn("create rc =" + KeeperException.Code.get(rc) + " for " + path + " remaining retries=" + retry_count); if (retry_count == 0) { - tot_mgr_node_create_err.incrementAndGet(); + SplitLogCounters.tot_mgr_node_create_err.incrementAndGet(); createNodeFailure(path); } else { - tot_mgr_node_create_retry.incrementAndGet(); + SplitLogCounters.tot_mgr_node_create_retry.incrementAndGet(); createNode(path, retry_count - 1); } return; @@ -1002,19 +1002,23 @@ public class SplitLogManager extends ZooKeeperListener { @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { - tot_mgr_get_data_result.incrementAndGet(); + SplitLogCounters.tot_mgr_get_data_result.incrementAndGet(); if (rc != 0) { if (rc == KeeperException.Code.SESSIONEXPIRED.intValue()) { LOG.error("ZK session expired. Master is expected to shut down. Abandoning retries."); return; } if (rc == KeeperException.Code.NONODE.intValue()) { - tot_mgr_get_data_nonode.incrementAndGet(); + SplitLogCounters.tot_mgr_get_data_nonode.incrementAndGet(); // The task znode has been deleted. Must be some pending delete // that deleted the task. Assume success because a task-znode is // is only deleted after TaskFinisher is successful. LOG.warn("task znode " + path + " vanished."); - getDataSetWatchSuccess(path, null, Integer.MIN_VALUE); + try { + getDataSetWatchSuccess(path, null, Integer.MIN_VALUE); + } catch (DeserializationException e) { + LOG.warn("Deserialization problem", e); + } return; } Long retry_count = (Long) ctx; @@ -1027,15 +1031,19 @@ public class SplitLogManager extends ZooKeeperListener { LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path + " remaining retries=" + retry_count); if (retry_count == 0) { - tot_mgr_get_data_err.incrementAndGet(); + SplitLogCounters.tot_mgr_get_data_err.incrementAndGet(); getDataSetWatchFailure(path); } else { - tot_mgr_get_data_retry.incrementAndGet(); + SplitLogCounters.tot_mgr_get_data_retry.incrementAndGet(); getDataSetWatch(path, retry_count - 1); } return; } - getDataSetWatchSuccess(path, data, stat.getVersion()); + try { + getDataSetWatchSuccess(path, data, stat.getVersion()); + } catch (DeserializationException e) { + LOG.warn("Deserialization problem", e); + } return; } } @@ -1049,10 +1057,10 @@ public class SplitLogManager extends ZooKeeperListener { @Override public void processResult(int rc, String path, Object ctx) { - tot_mgr_node_delete_result.incrementAndGet(); + SplitLogCounters.tot_mgr_node_delete_result.incrementAndGet(); if (rc != 0) { if (rc != KeeperException.Code.NONODE.intValue()) { - tot_mgr_node_delete_err.incrementAndGet(); + SplitLogCounters.tot_mgr_node_delete_err.incrementAndGet(); Long retry_count = (Long) ctx; LOG.warn("delete rc=" + KeeperException.Code.get(rc) + " for " + path + " remaining retries=" + retry_count); @@ -1139,10 +1147,12 @@ public class SplitLogManager extends ZooKeeperListener { */ public Status finish(String workerName, String taskname); } + enum ResubmitDirective { CHECK(), FORCE(); } + enum TerminationStatus { IN_PROGRESS("in_progress"), SUCCESS("success"), diff --git src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 994cb76..226f4aa 100644 --- src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DeserializationException; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; @@ -136,6 +137,7 @@ public final class ProtobufUtil { * for preamble. */ static final byte [] PB_MAGIC = new byte [] {'P', 'B', 'U', 'F'}; + private static final String PB_MAGIC_STR = Bytes.toString(PB_MAGIC); /** * Prepend the passed bytes with four bytes of magic, {@link #PB_MAGIC}, to flag what @@ -158,6 +160,16 @@ public final class ProtobufUtil { } /** + * @param bytes + * @throws DeserializationException if we are missing the pb magic prefix + */ + public static void expectPBMagicPrefix(final byte [] bytes) throws DeserializationException { + if (!isPBMagicPrefix(bytes)) { + throw new DeserializationException("Missing pb magic " + PB_MAGIC_STR + " prefix"); + } + } + + /** * @return Length of {@link #PB_MAGIC} */ public static int lengthOfPBMagic() { @@ -231,6 +243,7 @@ public final class ProtobufUtil { * * @param serverName the ServerName to convert * @return the converted protocol buffer ServerName + * @see #toServerName(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName) */ public static HBaseProtos.ServerName toServerName(final ServerName serverName) { @@ -253,8 +266,7 @@ public final class ProtobufUtil { * @param proto the protocol buffer ServerName to convert * @return the converted ServerName */ - public static ServerName toServerName( - final HBaseProtos.ServerName proto) { + public static ServerName toServerName(final HBaseProtos.ServerName proto) { if (proto == null) return null; String hostName = proto.getHostName(); long startCode = -1; @@ -274,8 +286,7 @@ public final class ProtobufUtil { * @param proto the RegionInfo to convert * @return the converted HRegionInfo */ - public static HRegionInfo - toRegionInfo(final RegionInfo proto) { + public static HRegionInfo toRegionInfo(final RegionInfo proto) { if (proto == null) return null; byte[] tableName = proto.getTableName().toByteArray(); if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) { @@ -310,8 +321,7 @@ public final class ProtobufUtil { * @param info the HRegionInfo to convert * @return the converted RegionInfo */ - public static RegionInfo - toRegionInfo(final HRegionInfo info) { + public static RegionInfo toRegionInfo(final HRegionInfo info) { if (info == null) return null; RegionInfo.Builder builder = RegionInfo.newBuilder(); builder.setTableName(ByteString.copyFrom(info.getTableName())); diff --git src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java index 8457bdc..6be9b7c 100644 --- src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java +++ src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java @@ -1786,6 +1786,1403 @@ public final class ZooKeeperProtos { // @@protoc_insertion_point(class_scope:ClusterUp) } + public interface RegionTransitionOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // required uint32 eventTypeCode = 1; + boolean hasEventTypeCode(); + int getEventTypeCode(); + + // required bytes regionName = 2; + boolean hasRegionName(); + com.google.protobuf.ByteString getRegionName(); + + // required uint64 createTime = 3; + boolean hasCreateTime(); + long getCreateTime(); + + // optional .ServerName originServerName = 4; + boolean hasOriginServerName(); + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName getOriginServerName(); + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getOriginServerNameOrBuilder(); + + // optional bytes payload = 5; + boolean hasPayload(); + com.google.protobuf.ByteString getPayload(); + } + public static final class RegionTransition extends + com.google.protobuf.GeneratedMessage + implements RegionTransitionOrBuilder { + // Use RegionTransition.newBuilder() to construct. + private RegionTransition(Builder builder) { + super(builder); + } + private RegionTransition(boolean noInit) {} + + private static final RegionTransition defaultInstance; + public static RegionTransition getDefaultInstance() { + return defaultInstance; + } + + public RegionTransition getDefaultInstanceForType() { + return defaultInstance; + } + + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_RegionTransition_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_RegionTransition_fieldAccessorTable; + } + + private int bitField0_; + // required uint32 eventTypeCode = 1; + public static final int EVENTTYPECODE_FIELD_NUMBER = 1; + private int eventTypeCode_; + public boolean hasEventTypeCode() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public int getEventTypeCode() { + return eventTypeCode_; + } + + // required bytes regionName = 2; + public static final int REGIONNAME_FIELD_NUMBER = 2; + private com.google.protobuf.ByteString regionName_; + public boolean hasRegionName() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public com.google.protobuf.ByteString getRegionName() { + return regionName_; + } + + // required uint64 createTime = 3; + public static final int CREATETIME_FIELD_NUMBER = 3; + private long createTime_; + public boolean hasCreateTime() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public long getCreateTime() { + return createTime_; + } + + // optional .ServerName originServerName = 4; + public static final int ORIGINSERVERNAME_FIELD_NUMBER = 4; + private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName originServerName_; + public boolean hasOriginServerName() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName getOriginServerName() { + return originServerName_; + } + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getOriginServerNameOrBuilder() { + return originServerName_; + } + + // optional bytes payload = 5; + public static final int PAYLOAD_FIELD_NUMBER = 5; + private com.google.protobuf.ByteString payload_; + public boolean hasPayload() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + public com.google.protobuf.ByteString getPayload() { + return payload_; + } + + private void initFields() { + eventTypeCode_ = 0; + regionName_ = com.google.protobuf.ByteString.EMPTY; + createTime_ = 0L; + originServerName_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance(); + payload_ = com.google.protobuf.ByteString.EMPTY; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasEventTypeCode()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasRegionName()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasCreateTime()) { + memoizedIsInitialized = 0; + return false; + } + if (hasOriginServerName()) { + if (!getOriginServerName().isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeUInt32(1, eventTypeCode_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeBytes(2, regionName_); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeUInt64(3, createTime_); + } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + output.writeMessage(4, originServerName_); + } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + output.writeBytes(5, payload_); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt32Size(1, eventTypeCode_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(2, regionName_); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt64Size(3, createTime_); + } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + size += com.google.protobuf.CodedOutputStream + .computeMessageSize(4, originServerName_); + } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(5, payload_); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition)) { + return super.equals(obj); + } + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition other = (org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition) obj; + + boolean result = true; + result = result && (hasEventTypeCode() == other.hasEventTypeCode()); + if (hasEventTypeCode()) { + result = result && (getEventTypeCode() + == other.getEventTypeCode()); + } + result = result && (hasRegionName() == other.hasRegionName()); + if (hasRegionName()) { + result = result && getRegionName() + .equals(other.getRegionName()); + } + result = result && (hasCreateTime() == other.hasCreateTime()); + if (hasCreateTime()) { + result = result && (getCreateTime() + == other.getCreateTime()); + } + result = result && (hasOriginServerName() == other.hasOriginServerName()); + if (hasOriginServerName()) { + result = result && getOriginServerName() + .equals(other.getOriginServerName()); + } + result = result && (hasPayload() == other.hasPayload()); + if (hasPayload()) { + result = result && getPayload() + .equals(other.getPayload()); + } + result = result && + getUnknownFields().equals(other.getUnknownFields()); + return result; + } + + @java.lang.Override + public int hashCode() { + int hash = 41; + hash = (19 * hash) + getDescriptorForType().hashCode(); + if (hasEventTypeCode()) { + hash = (37 * hash) + EVENTTYPECODE_FIELD_NUMBER; + hash = (53 * hash) + getEventTypeCode(); + } + if (hasRegionName()) { + hash = (37 * hash) + REGIONNAME_FIELD_NUMBER; + hash = (53 * hash) + getRegionName().hashCode(); + } + if (hasCreateTime()) { + hash = (37 * hash) + CREATETIME_FIELD_NUMBER; + hash = (53 * hash) + hashLong(getCreateTime()); + } + if (hasOriginServerName()) { + hash = (37 * hash) + ORIGINSERVERNAME_FIELD_NUMBER; + hash = (53 * hash) + getOriginServerName().hashCode(); + } + if (hasPayload()) { + hash = (37 * hash) + PAYLOAD_FIELD_NUMBER; + hash = (53 * hash) + getPayload().hashCode(); + } + hash = (29 * hash) + getUnknownFields().hashCode(); + return hash; + } + + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input, extensionRegistry)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransitionOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_RegionTransition_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_RegionTransition_fieldAccessorTable; + } + + // Construct using org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder(BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + getOriginServerNameFieldBuilder(); + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + eventTypeCode_ = 0; + bitField0_ = (bitField0_ & ~0x00000001); + regionName_ = com.google.protobuf.ByteString.EMPTY; + bitField0_ = (bitField0_ & ~0x00000002); + createTime_ = 0L; + bitField0_ = (bitField0_ & ~0x00000004); + if (originServerNameBuilder_ == null) { + originServerName_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance(); + } else { + originServerNameBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000008); + payload_ = com.google.protobuf.ByteString.EMPTY; + bitField0_ = (bitField0_ & ~0x00000010); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition.getDescriptor(); + } + + public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition getDefaultInstanceForType() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition.getDefaultInstance(); + } + + public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition build() { + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + private org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition buildParsed() + throws com.google.protobuf.InvalidProtocolBufferException { + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return result; + } + + public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition buildPartial() { + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition result = new org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.eventTypeCode_ = eventTypeCode_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.regionName_ = regionName_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.createTime_ = createTime_; + if (((from_bitField0_ & 0x00000008) == 0x00000008)) { + to_bitField0_ |= 0x00000008; + } + if (originServerNameBuilder_ == null) { + result.originServerName_ = originServerName_; + } else { + result.originServerName_ = originServerNameBuilder_.build(); + } + if (((from_bitField0_ & 0x00000010) == 0x00000010)) { + to_bitField0_ |= 0x00000010; + } + result.payload_ = payload_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition) { + return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition other) { + if (other == org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition.getDefaultInstance()) return this; + if (other.hasEventTypeCode()) { + setEventTypeCode(other.getEventTypeCode()); + } + if (other.hasRegionName()) { + setRegionName(other.getRegionName()); + } + if (other.hasCreateTime()) { + setCreateTime(other.getCreateTime()); + } + if (other.hasOriginServerName()) { + mergeOriginServerName(other.getOriginServerName()); + } + if (other.hasPayload()) { + setPayload(other.getPayload()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (!hasEventTypeCode()) { + + return false; + } + if (!hasRegionName()) { + + return false; + } + if (!hasCreateTime()) { + + return false; + } + if (hasOriginServerName()) { + if (!getOriginServerName().isInitialized()) { + + return false; + } + } + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder( + this.getUnknownFields()); + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + this.setUnknownFields(unknownFields.build()); + onChanged(); + return this; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + this.setUnknownFields(unknownFields.build()); + onChanged(); + return this; + } + break; + } + case 8: { + bitField0_ |= 0x00000001; + eventTypeCode_ = input.readUInt32(); + break; + } + case 18: { + bitField0_ |= 0x00000002; + regionName_ = input.readBytes(); + break; + } + case 24: { + bitField0_ |= 0x00000004; + createTime_ = input.readUInt64(); + break; + } + case 34: { + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder subBuilder = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.newBuilder(); + if (hasOriginServerName()) { + subBuilder.mergeFrom(getOriginServerName()); + } + input.readMessage(subBuilder, extensionRegistry); + setOriginServerName(subBuilder.buildPartial()); + break; + } + case 42: { + bitField0_ |= 0x00000010; + payload_ = input.readBytes(); + break; + } + } + } + } + + private int bitField0_; + + // required uint32 eventTypeCode = 1; + private int eventTypeCode_ ; + public boolean hasEventTypeCode() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public int getEventTypeCode() { + return eventTypeCode_; + } + public Builder setEventTypeCode(int value) { + bitField0_ |= 0x00000001; + eventTypeCode_ = value; + onChanged(); + return this; + } + public Builder clearEventTypeCode() { + bitField0_ = (bitField0_ & ~0x00000001); + eventTypeCode_ = 0; + onChanged(); + return this; + } + + // required bytes regionName = 2; + private com.google.protobuf.ByteString regionName_ = com.google.protobuf.ByteString.EMPTY; + public boolean hasRegionName() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public com.google.protobuf.ByteString getRegionName() { + return regionName_; + } + public Builder setRegionName(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + regionName_ = value; + onChanged(); + return this; + } + public Builder clearRegionName() { + bitField0_ = (bitField0_ & ~0x00000002); + regionName_ = getDefaultInstance().getRegionName(); + onChanged(); + return this; + } + + // required uint64 createTime = 3; + private long createTime_ ; + public boolean hasCreateTime() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public long getCreateTime() { + return createTime_; + } + public Builder setCreateTime(long value) { + bitField0_ |= 0x00000004; + createTime_ = value; + onChanged(); + return this; + } + public Builder clearCreateTime() { + bitField0_ = (bitField0_ & ~0x00000004); + createTime_ = 0L; + onChanged(); + return this; + } + + // optional .ServerName originServerName = 4; + private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName originServerName_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance(); + private com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder> originServerNameBuilder_; + public boolean hasOriginServerName() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName getOriginServerName() { + if (originServerNameBuilder_ == null) { + return originServerName_; + } else { + return originServerNameBuilder_.getMessage(); + } + } + public Builder setOriginServerName(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName value) { + if (originServerNameBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + originServerName_ = value; + onChanged(); + } else { + originServerNameBuilder_.setMessage(value); + } + bitField0_ |= 0x00000008; + return this; + } + public Builder setOriginServerName( + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder builderForValue) { + if (originServerNameBuilder_ == null) { + originServerName_ = builderForValue.build(); + onChanged(); + } else { + originServerNameBuilder_.setMessage(builderForValue.build()); + } + bitField0_ |= 0x00000008; + return this; + } + public Builder mergeOriginServerName(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName value) { + if (originServerNameBuilder_ == null) { + if (((bitField0_ & 0x00000008) == 0x00000008) && + originServerName_ != org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance()) { + originServerName_ = + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.newBuilder(originServerName_).mergeFrom(value).buildPartial(); + } else { + originServerName_ = value; + } + onChanged(); + } else { + originServerNameBuilder_.mergeFrom(value); + } + bitField0_ |= 0x00000008; + return this; + } + public Builder clearOriginServerName() { + if (originServerNameBuilder_ == null) { + originServerName_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance(); + onChanged(); + } else { + originServerNameBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000008); + return this; + } + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder getOriginServerNameBuilder() { + bitField0_ |= 0x00000008; + onChanged(); + return getOriginServerNameFieldBuilder().getBuilder(); + } + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getOriginServerNameOrBuilder() { + if (originServerNameBuilder_ != null) { + return originServerNameBuilder_.getMessageOrBuilder(); + } else { + return originServerName_; + } + } + private com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder> + getOriginServerNameFieldBuilder() { + if (originServerNameBuilder_ == null) { + originServerNameBuilder_ = new com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder>( + originServerName_, + getParentForChildren(), + isClean()); + originServerName_ = null; + } + return originServerNameBuilder_; + } + + // optional bytes payload = 5; + private com.google.protobuf.ByteString payload_ = com.google.protobuf.ByteString.EMPTY; + public boolean hasPayload() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + public com.google.protobuf.ByteString getPayload() { + return payload_; + } + public Builder setPayload(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000010; + payload_ = value; + onChanged(); + return this; + } + public Builder clearPayload() { + bitField0_ = (bitField0_ & ~0x00000010); + payload_ = getDefaultInstance().getPayload(); + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:RegionTransition) + } + + static { + defaultInstance = new RegionTransition(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:RegionTransition) + } + + public interface SplitLogTaskOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // required .SplitLogTask.State state = 1; + boolean hasState(); + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.State getState(); + + // required .ServerName serverName = 2; + boolean hasServerName(); + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName getServerName(); + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getServerNameOrBuilder(); + } + public static final class SplitLogTask extends + com.google.protobuf.GeneratedMessage + implements SplitLogTaskOrBuilder { + // Use SplitLogTask.newBuilder() to construct. + private SplitLogTask(Builder builder) { + super(builder); + } + private SplitLogTask(boolean noInit) {} + + private static final SplitLogTask defaultInstance; + public static SplitLogTask getDefaultInstance() { + return defaultInstance; + } + + public SplitLogTask getDefaultInstanceForType() { + return defaultInstance; + } + + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_SplitLogTask_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_SplitLogTask_fieldAccessorTable; + } + + public enum State + implements com.google.protobuf.ProtocolMessageEnum { + UNASSIGNED(0, 0), + OWNED(1, 1), + RESIGNED(2, 2), + DONE(3, 3), + ERR(4, 4), + ; + + public static final int UNASSIGNED_VALUE = 0; + public static final int OWNED_VALUE = 1; + public static final int RESIGNED_VALUE = 2; + public static final int DONE_VALUE = 3; + public static final int ERR_VALUE = 4; + + + public final int getNumber() { return value; } + + public static State valueOf(int value) { + switch (value) { + case 0: return UNASSIGNED; + case 1: return OWNED; + case 2: return RESIGNED; + case 3: return DONE; + case 4: return ERR; + default: return null; + } + } + + public static com.google.protobuf.Internal.EnumLiteMap + internalGetValueMap() { + return internalValueMap; + } + private static com.google.protobuf.Internal.EnumLiteMap + internalValueMap = + new com.google.protobuf.Internal.EnumLiteMap() { + public State findValueByNumber(int number) { + return State.valueOf(number); + } + }; + + public final com.google.protobuf.Descriptors.EnumValueDescriptor + getValueDescriptor() { + return getDescriptor().getValues().get(index); + } + public final com.google.protobuf.Descriptors.EnumDescriptor + getDescriptorForType() { + return getDescriptor(); + } + public static final com.google.protobuf.Descriptors.EnumDescriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.getDescriptor().getEnumTypes().get(0); + } + + private static final State[] VALUES = { + UNASSIGNED, OWNED, RESIGNED, DONE, ERR, + }; + + public static State valueOf( + com.google.protobuf.Descriptors.EnumValueDescriptor desc) { + if (desc.getType() != getDescriptor()) { + throw new java.lang.IllegalArgumentException( + "EnumValueDescriptor is not for this type."); + } + return VALUES[desc.getIndex()]; + } + + private final int index; + private final int value; + + private State(int index, int value) { + this.index = index; + this.value = value; + } + + // @@protoc_insertion_point(enum_scope:SplitLogTask.State) + } + + private int bitField0_; + // required .SplitLogTask.State state = 1; + public static final int STATE_FIELD_NUMBER = 1; + private org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.State state_; + public boolean hasState() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.State getState() { + return state_; + } + + // required .ServerName serverName = 2; + public static final int SERVERNAME_FIELD_NUMBER = 2; + private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName serverName_; + public boolean hasServerName() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName getServerName() { + return serverName_; + } + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getServerNameOrBuilder() { + return serverName_; + } + + private void initFields() { + state_ = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.State.UNASSIGNED; + serverName_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance(); + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasState()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasServerName()) { + memoizedIsInitialized = 0; + return false; + } + if (!getServerName().isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeEnum(1, state_.getNumber()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeMessage(2, serverName_); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeEnumSize(1, state_.getNumber()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeMessageSize(2, serverName_); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask)) { + return super.equals(obj); + } + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask other = (org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask) obj; + + boolean result = true; + result = result && (hasState() == other.hasState()); + if (hasState()) { + result = result && + (getState() == other.getState()); + } + result = result && (hasServerName() == other.hasServerName()); + if (hasServerName()) { + result = result && getServerName() + .equals(other.getServerName()); + } + result = result && + getUnknownFields().equals(other.getUnknownFields()); + return result; + } + + @java.lang.Override + public int hashCode() { + int hash = 41; + hash = (19 * hash) + getDescriptorForType().hashCode(); + if (hasState()) { + hash = (37 * hash) + STATE_FIELD_NUMBER; + hash = (53 * hash) + hashEnum(getState()); + } + if (hasServerName()) { + hash = (37 * hash) + SERVERNAME_FIELD_NUMBER; + hash = (53 * hash) + getServerName().hashCode(); + } + hash = (29 * hash) + getUnknownFields().hashCode(); + return hash; + } + + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input, extensionRegistry)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTaskOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_SplitLogTask_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_SplitLogTask_fieldAccessorTable; + } + + // Construct using org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder(BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + getServerNameFieldBuilder(); + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + state_ = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.State.UNASSIGNED; + bitField0_ = (bitField0_ & ~0x00000001); + if (serverNameBuilder_ == null) { + serverName_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance(); + } else { + serverNameBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000002); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.getDescriptor(); + } + + public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask getDefaultInstanceForType() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.getDefaultInstance(); + } + + public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask build() { + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + private org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask buildParsed() + throws com.google.protobuf.InvalidProtocolBufferException { + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return result; + } + + public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask buildPartial() { + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask result = new org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.state_ = state_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + if (serverNameBuilder_ == null) { + result.serverName_ = serverName_; + } else { + result.serverName_ = serverNameBuilder_.build(); + } + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask) { + return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask other) { + if (other == org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.getDefaultInstance()) return this; + if (other.hasState()) { + setState(other.getState()); + } + if (other.hasServerName()) { + mergeServerName(other.getServerName()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (!hasState()) { + + return false; + } + if (!hasServerName()) { + + return false; + } + if (!getServerName().isInitialized()) { + + return false; + } + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder( + this.getUnknownFields()); + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + this.setUnknownFields(unknownFields.build()); + onChanged(); + return this; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + this.setUnknownFields(unknownFields.build()); + onChanged(); + return this; + } + break; + } + case 8: { + int rawValue = input.readEnum(); + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.State value = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.State.valueOf(rawValue); + if (value == null) { + unknownFields.mergeVarintField(1, rawValue); + } else { + bitField0_ |= 0x00000001; + state_ = value; + } + break; + } + case 18: { + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder subBuilder = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.newBuilder(); + if (hasServerName()) { + subBuilder.mergeFrom(getServerName()); + } + input.readMessage(subBuilder, extensionRegistry); + setServerName(subBuilder.buildPartial()); + break; + } + } + } + } + + private int bitField0_; + + // required .SplitLogTask.State state = 1; + private org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.State state_ = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.State.UNASSIGNED; + public boolean hasState() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.State getState() { + return state_; + } + public Builder setState(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.State value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + state_ = value; + onChanged(); + return this; + } + public Builder clearState() { + bitField0_ = (bitField0_ & ~0x00000001); + state_ = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.State.UNASSIGNED; + onChanged(); + return this; + } + + // required .ServerName serverName = 2; + private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName serverName_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance(); + private com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder> serverNameBuilder_; + public boolean hasServerName() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName getServerName() { + if (serverNameBuilder_ == null) { + return serverName_; + } else { + return serverNameBuilder_.getMessage(); + } + } + public Builder setServerName(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName value) { + if (serverNameBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + serverName_ = value; + onChanged(); + } else { + serverNameBuilder_.setMessage(value); + } + bitField0_ |= 0x00000002; + return this; + } + public Builder setServerName( + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder builderForValue) { + if (serverNameBuilder_ == null) { + serverName_ = builderForValue.build(); + onChanged(); + } else { + serverNameBuilder_.setMessage(builderForValue.build()); + } + bitField0_ |= 0x00000002; + return this; + } + public Builder mergeServerName(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName value) { + if (serverNameBuilder_ == null) { + if (((bitField0_ & 0x00000002) == 0x00000002) && + serverName_ != org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance()) { + serverName_ = + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.newBuilder(serverName_).mergeFrom(value).buildPartial(); + } else { + serverName_ = value; + } + onChanged(); + } else { + serverNameBuilder_.mergeFrom(value); + } + bitField0_ |= 0x00000002; + return this; + } + public Builder clearServerName() { + if (serverNameBuilder_ == null) { + serverName_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance(); + onChanged(); + } else { + serverNameBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000002); + return this; + } + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder getServerNameBuilder() { + bitField0_ |= 0x00000002; + onChanged(); + return getServerNameFieldBuilder().getBuilder(); + } + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getServerNameOrBuilder() { + if (serverNameBuilder_ != null) { + return serverNameBuilder_.getMessageOrBuilder(); + } else { + return serverName_; + } + } + private com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder> + getServerNameFieldBuilder() { + if (serverNameBuilder_ == null) { + serverNameBuilder_ = new com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder>( + serverName_, + getParentForChildren(), + isClean()); + serverName_ = null; + } + return serverNameBuilder_; + } + + // @@protoc_insertion_point(builder_scope:SplitLogTask) + } + + static { + defaultInstance = new SplitLogTask(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:SplitLogTask) + } + private static com.google.protobuf.Descriptors.Descriptor internal_static_RootRegionServer_descriptor; private static @@ -1806,6 +3203,16 @@ public final class ZooKeeperProtos { private static com.google.protobuf.GeneratedMessage.FieldAccessorTable internal_static_ClusterUp_fieldAccessorTable; + private static com.google.protobuf.Descriptors.Descriptor + internal_static_RegionTransition_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_RegionTransition_fieldAccessorTable; + private static com.google.protobuf.Descriptors.Descriptor + internal_static_SplitLogTask_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_SplitLogTask_fieldAccessorTable; public static com.google.protobuf.Descriptors.FileDescriptor getDescriptor() { @@ -1819,9 +3226,16 @@ public final class ZooKeeperProtos { "gionServer\022\033\n\006server\030\001 \002(\0132\013.ServerName\"" + "%\n\006Master\022\033\n\006master\030\001 \002(\0132\013.ServerName\"\036" + "\n\tClusterId\022\021\n\tclusterId\030\001 \002(\t\"\036\n\tCluste" + - "rUp\022\021\n\tstartDate\030\001 \002(\tBE\n*org.apache.had" + - "oop.hbase.protobuf.generatedB\017ZooKeeperP" + - "rotosH\001\210\001\001\240\001\001" + "rUp\022\021\n\tstartDate\030\001 \002(\t\"\211\001\n\020RegionTransit" + + "ion\022\025\n\reventTypeCode\030\001 \002(\r\022\022\n\nregionName" + + "\030\002 \002(\014\022\022\n\ncreateTime\030\003 \002(\004\022%\n\020originServ" + + "erName\030\004 \001(\0132\013.ServerName\022\017\n\007payload\030\005 \001" + + "(\014\"\230\001\n\014SplitLogTask\022\"\n\005state\030\001 \002(\0162\023.Spl" + + "itLogTask.State\022\037\n\nserverName\030\002 \002(\0132\013.Se", + "rverName\"C\n\005State\022\016\n\nUNASSIGNED\020\000\022\t\n\005OWN" + + "ED\020\001\022\014\n\010RESIGNED\020\002\022\010\n\004DONE\020\003\022\007\n\003ERR\020\004BE\n" + + "*org.apache.hadoop.hbase.protobuf.genera" + + "tedB\017ZooKeeperProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -1860,6 +3274,22 @@ public final class ZooKeeperProtos { new java.lang.String[] { "StartDate", }, org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ClusterUp.class, org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ClusterUp.Builder.class); + internal_static_RegionTransition_descriptor = + getDescriptor().getMessageTypes().get(4); + internal_static_RegionTransition_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_RegionTransition_descriptor, + new java.lang.String[] { "EventTypeCode", "RegionName", "CreateTime", "OriginServerName", "Payload", }, + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition.class, + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition.Builder.class); + internal_static_SplitLogTask_descriptor = + getDescriptor().getMessageTypes().get(5); + internal_static_SplitLogTask_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_SplitLogTask_descriptor, + new java.lang.String[] { "State", "ServerName", }, + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.class, + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.Builder.class); return null; } }; diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 36ea2ec..8953fb5 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1394,7 +1394,7 @@ public class HRegionServer extends RegionServer // Create the log splitting worker and start it this.splitLogWorker = new SplitLogWorker(this.zooKeeper, - this.getConfiguration(), this.getServerName().toString()); + this.getConfiguration(), this.getServerName()); splitLogWorker.start(); } diff --git src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java index 8ea342f..bc505e4 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java @@ -19,8 +19,6 @@ */ package org.apache.hadoop.hbase.regionserver; -import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*; - import java.io.IOException; import java.io.InterruptedIOException; import java.util.List; @@ -32,12 +30,14 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.master.SplitLogManager; +import org.apache.hadoop.hbase.DeserializationException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.SplitLogCounters; +import org.apache.hadoop.hbase.SplitLogTask; import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; -import org.apache.hadoop.hbase.zookeeper.ZKSplitLog.TaskState; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -71,7 +71,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { private static final Log LOG = LogFactory.getLog(SplitLogWorker.class); Thread worker; - private final String serverName; + private final ServerName serverName; private final TaskExecutor splitTaskExecutor; private long zkretries; @@ -85,7 +85,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, - String serverName, TaskExecutor splitTaskExecutor) { + ServerName serverName, TaskExecutor splitTaskExecutor) { super(watcher); this.serverName = serverName; this.splitTaskExecutor = splitTaskExecutor; @@ -93,7 +93,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { } public SplitLogWorker(ZooKeeperWatcher watcher, final Configuration conf, - final String serverName) { + final ServerName serverName) { this(watcher, conf, serverName, new TaskExecutor () { @Override public Status exec(String filename, CancelableProgressable p) { @@ -111,7 +111,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { // encountered a bad non-retry-able persistent error. try { String tmpname = - ZKSplitLog.getSplitLogDirTmpComponent(serverName, filename); + ZKSplitLog.getSplitLogDirTmpComponent(serverName.toString(), filename); if (HLogSplitter.splitLogFileToTemp(rootdir, tmpname, fs.getFileStatus(new Path(filename)), fs, conf, p) == false) { return Status.PREEMPTED; @@ -241,31 +241,40 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { try { try { if ((data = ZKUtil.getDataNoWatch(this.watcher, path, stat)) == null) { - tot_wkr_failed_to_grab_task_no_data.incrementAndGet(); + SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.incrementAndGet(); return; } } catch (KeeperException e) { LOG.warn("Failed to get data for znode " + path, e); - tot_wkr_failed_to_grab_task_exception.incrementAndGet(); + SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet(); + return; + } + SplitLogTask slt; + try { + slt = SplitLogTask.parseFrom(data); + } catch (DeserializationException e) { + LOG.warn("Failed parse data for znode " + path, e); + SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet(); return; } - if (TaskState.TASK_UNASSIGNED.equals(data) == false) { - tot_wkr_failed_to_grab_task_owned.incrementAndGet(); + if (slt.isUnassigned() == false) { + SplitLogCounters.tot_wkr_failed_to_grab_task_owned.incrementAndGet(); return; } currentVersion = stat.getVersion(); if (attemptToOwnTask(true) == false) { - tot_wkr_failed_to_grab_task_lost_race.incrementAndGet(); + SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet(); return; } if (ZKSplitLog.isRescanNode(watcher, currentTask)) { - endTask(TaskState.TASK_DONE, tot_wkr_task_acquired_rescan); + endTask(new SplitLogTask.Done(this.serverName), + SplitLogCounters.tot_wkr_task_acquired_rescan); return; } LOG.info("worker " + serverName + " acquired task " + path); - tot_wkr_task_acquired.incrementAndGet(); + SplitLogCounters.tot_wkr_task_acquired.incrementAndGet(); getDataSetWatchAsync(); t = System.currentTimeMillis(); @@ -285,15 +294,17 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { }); switch (status) { case DONE: - endTask(TaskState.TASK_DONE, tot_wkr_task_done); + endTask(new SplitLogTask.Done(this.serverName), + SplitLogCounters.tot_wkr_task_done); break; case PREEMPTED: - tot_wkr_preempt_task.incrementAndGet(); + SplitLogCounters.tot_wkr_preempt_task.incrementAndGet(); LOG.warn("task execution prempted " + path); break; case ERR: if (!exitWorker) { - endTask(TaskState.TASK_ERR, tot_wkr_task_err); + endTask(new SplitLogTask.Err(this.serverName), + SplitLogCounters.tot_wkr_task_err); break; } // if the RS is exiting then there is probably a tons of stuff @@ -303,9 +314,10 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { if (exitWorker) { LOG.info("task execution interrupted because worker is exiting " + path); - endTask(TaskState.TASK_RESIGNED, tot_wkr_task_resigned); + endTask(new SplitLogTask.Resigned(this.serverName), + SplitLogCounters.tot_wkr_task_resigned); } else { - tot_wkr_preempt_task.incrementAndGet(); + SplitLogCounters.tot_wkr_preempt_task.incrementAndGet(); LOG.info("task execution interrupted via zk by manager " + path); } @@ -337,15 +349,16 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { */ private boolean attemptToOwnTask(boolean isFirstTime) { try { - Stat stat = this.watcher.getRecoverableZooKeeper().setData(currentTask, - TaskState.TASK_OWNED.get(serverName), currentVersion); + SplitLogTask slt = new SplitLogTask.Owned(this.serverName); + Stat stat = + this.watcher.getRecoverableZooKeeper().setData(currentTask, slt.toByteArray(), currentVersion); if (stat == null) { LOG.warn("zk.setData() returned null for path " + currentTask); - tot_wkr_task_heartbeat_failed.incrementAndGet(); + SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet(); return (false); } currentVersion = stat.getVersion(); - tot_wkr_task_heartbeat.incrementAndGet(); + SplitLogCounters.tot_wkr_task_heartbeat.incrementAndGet(); return (true); } catch (KeeperException e) { if (!isFirstTime) { @@ -363,7 +376,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { currentTask + " " + StringUtils.stringifyException(e1)); Thread.currentThread().interrupt(); } - tot_wkr_task_heartbeat_failed.incrementAndGet(); + SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet(); return (false); } @@ -373,29 +386,28 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { * @param ts * @param ctr */ - private void endTask(ZKSplitLog.TaskState ts, AtomicLong ctr) { + private void endTask(SplitLogTask slt, AtomicLong ctr) { String path = currentTask; currentTask = null; try { - if (ZKUtil.setData(this.watcher, path, ts.get(serverName), + if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), currentVersion)) { - LOG.info("successfully transitioned task " + path + - " to final state " + ts); + LOG.info("successfully transitioned task " + path + " to final state " + slt); ctr.incrementAndGet(); return; } - LOG.warn("failed to transistion task " + path + " to end state " + ts + + LOG.warn("failed to transistion task " + path + " to end state " + slt + " because of version mismatch "); } catch (KeeperException.BadVersionException bve) { - LOG.warn("transisition task " + path + " to " + ts + + LOG.warn("transisition task " + path + " to " + slt + " failed because of version mismatch", bve); } catch (KeeperException.NoNodeException e) { - LOG.fatal("logic error - end task " + path + " " + ts + + LOG.fatal("logic error - end task " + path + " " + slt + " failed because task doesn't exist", e); } catch (KeeperException e) { - LOG.warn("failed to end task, " + path + " " + ts, e); + LOG.warn("failed to end task, " + path + " " + slt, e); } - tot_wkr_final_transistion_failed.incrementAndGet(); + SplitLogCounters.tot_wkr_final_transistion_failed.incrementAndGet(); return; } @@ -403,10 +415,17 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { this.watcher.getRecoverableZooKeeper().getZooKeeper(). getData(currentTask, this.watcher, new GetDataAsyncCallback(), null); - tot_wkr_get_data_queued.incrementAndGet(); + SplitLogCounters.tot_wkr_get_data_queued.incrementAndGet(); } void getDataSetWatchSuccess(String path, byte[] data) { + SplitLogTask slt; + try { + slt = SplitLogTask.parseFrom(data); + } catch (DeserializationException e) { + LOG.warn("Failed parse", e); + return; + } synchronized (grabTaskLock) { if (workerInGrabTask) { // currentTask can change but that's ok @@ -418,13 +437,12 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { // UNASSIGNED because by the time this worker sets the data watch // the node might have made two transitions - from owned by this // worker to unassigned to owned by another worker - if (! TaskState.TASK_OWNED.equals(data, serverName) && - ! TaskState.TASK_DONE.equals(data, serverName) && - ! TaskState.TASK_ERR.equals(data, serverName) && - ! TaskState.TASK_RESIGNED.equals(data, serverName)) { + if (! slt.isOwned(this.serverName) && + ! slt.isDone(this.serverName) && + ! slt.isErr(this.serverName) && + ! slt.isResigned(this.serverName)) { LOG.info("task " + taskpath + " preempted from " + - serverName + ", current task state and owner=" + - new String(data)); + serverName + ", current task state and owner=" + slt.toString()); stopTask(); } } @@ -439,7 +457,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { String taskpath = currentTask; if (taskpath != null && taskpath.equals(path)) { LOG.info("retrying data watch on " + path); - tot_wkr_get_data_retry.incrementAndGet(); + SplitLogCounters.tot_wkr_get_data_retry.incrementAndGet(); getDataSetWatchAsync(); } else { // no point setting a watch on the task which this worker is not @@ -543,9 +561,8 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class); @Override - public void processResult(int rc, String path, Object ctx, byte[] data, - Stat stat) { - tot_wkr_get_data_result.incrementAndGet(); + public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { + SplitLogCounters.tot_wkr_get_data_result.incrementAndGet(); if (rc != 0) { LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path); getDataSetWatchFailure(path); @@ -573,4 +590,4 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { } public Status exec(String name, CancelableProgressable p); } -} +} \ No newline at end of file diff --git src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java index ea12da4..231e2d6 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java @@ -39,11 +39,11 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.RegionTransition; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.catalog.MetaEditor; import org.apache.hadoop.hbase.executor.EventHandler.EventType; -import org.apache.hadoop.hbase.executor.RegionTransitionData; import org.apache.hadoop.hbase.io.Reference.Range; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CancelableProgressable; @@ -858,12 +858,10 @@ public class SplitTransaction { throws KeeperException, IOException { LOG.debug(zkw.prefix("Creating ephemeral node for " + region.getEncodedName() + " in SPLITTING state")); - RegionTransitionData data = - new RegionTransitionData(EventType.RS_ZK_REGION_SPLITTING, + RegionTransition rt = RegionTransition.createRegionTransition(EventType.RS_ZK_REGION_SPLITTING, region.getRegionName(), serverName); - String node = ZKAssign.getNodeName(zkw, region.getEncodedName()); - if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, data.getBytes())) { + if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, rt.toByteArray())) { throw new IOException("Failed create of ephemeral " + node); } // Transition node from SPLITTING to SPLITTING and pick up version so we diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index 587386c..bafa069 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -375,7 +376,7 @@ public class HLogSplitter { // How often to send a progress report (default 1/2 the zookeeper session // timeout of if that not set, the split log DEFAULT_TIMEOUT) int period = conf.getInt("hbase.splitlog.report.period", - conf.getInt("hbase.splitlog.manager.timeout", ZKSplitLog.DEFAULT_TIMEOUT) / 2); + conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 2); int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3); Path logPath = logfile.getPath(); diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/EmptyWatcher.java src/main/java/org/apache/hadoop/hbase/zookeeper/EmptyWatcher.java new file mode 100644 index 0000000..acdbdd0 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/zookeeper/EmptyWatcher.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.WatchedEvent; + +/** + * An empty ZooKeeper watcher + */ +@InterfaceAudience.Private +public class EmptyWatcher implements Watcher { + // Used in this package but also by tests so needs to be public + public static EmptyWatcher instance = new EmptyWatcher(); + private EmptyWatcher() {} + + public void process(WatchedEvent event) {} +} \ No newline at end of file diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/MasterAddressTracker.java src/main/java/org/apache/hadoop/hbase/zookeeper/MasterAddressTracker.java index f9575af..9b85a7e 100644 --- src/main/java/org/apache/hadoop/hbase/zookeeper/MasterAddressTracker.java +++ src/main/java/org/apache/hadoop/hbase/zookeeper/MasterAddressTracker.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.DeserializationException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; @@ -80,7 +81,12 @@ public class MasterAddressTracker extends ZooKeeperNodeTracker { * @return Server name or null if timed out. */ public ServerName getMasterAddress(final boolean refresh) { - return ZKUtil.znodeContentToServerName(super.getData(refresh)); + try { + return ServerName.parseFrom(super.getData(refresh)); + } catch (DeserializationException e) { + LOG.warn("Failed parse", e); + return null; + } } /** @@ -99,7 +105,13 @@ public class MasterAddressTracker extends ZooKeeperNodeTracker { if (data == null){ throw new IOException("Can't get master address from ZooKeeper; znode data == null"); } - return ZKUtil.znodeContentToServerName(data); + try { + return ServerName.parseFrom(data); + } catch (DeserializationException e) { + KeeperException ke = new KeeperException.DataInconsistencyException(); + ke.initCause(e); + throw ke; + } } /** @@ -116,7 +128,7 @@ public class MasterAddressTracker extends ZooKeeperNodeTracker { public static boolean setMasterAddress(final ZooKeeperWatcher zkw, final String znode, final ServerName master) throws KeeperException { - return ZKUtil.createEphemeralNodeAndWatch(zkw, znode, getZNodeData(master)); + return ZKUtil.createEphemeralNodeAndWatch(zkw, znode, toByteArray(master)); } /** @@ -132,7 +144,7 @@ public class MasterAddressTracker extends ZooKeeperNodeTracker { * @return Content of the master znode as a serialized pb with the pb * magic as prefix. */ - static byte [] getZNodeData(final ServerName sn) { + static byte [] toByteArray(final ServerName sn) { ZooKeeperProtos.Master.Builder mbuilder = ZooKeeperProtos.Master.newBuilder(); HBaseProtos.ServerName.Builder snbuilder = HBaseProtos.ServerName.newBuilder(); snbuilder.setHostName(sn.getHostname()); diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java index babde80..0808912 100644 --- src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java +++ src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java @@ -19,17 +19,13 @@ package org.apache.hadoop.hbase.zookeeper; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.DeserializationException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; -import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RootRegionServer; -import org.apache.hadoop.hbase.util.Addressing; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.zookeeper.KeeperException; -import com.google.protobuf.InvalidProtocolBufferException; - /** * Tracks the root region server location node in zookeeper. * Root region location is set by {@link RootLocationEditor} usually called @@ -64,7 +60,12 @@ public class RootRegionTracker extends ZooKeeperNodeTracker { * @throws InterruptedException */ public ServerName getRootRegionLocation() throws InterruptedException { - return ZKUtil.znodeContentToServerName(super.getData(true)); + try { + return ServerName.parseFrom(super.getData(true)); + } catch (DeserializationException e) { + LOG.warn("Failed parse", e); + return null; + } } /** @@ -76,7 +77,11 @@ public class RootRegionTracker extends ZooKeeperNodeTracker { */ public static ServerName getRootRegionLocation(final ZooKeeperWatcher zkw) throws KeeperException { - return ZKUtil.znodeContentToServerName(ZKUtil.getData(zkw, zkw.rootServerZNode)); + try { + return ServerName.parseFrom(ZKUtil.getData(zkw, zkw.rootServerZNode)); + } catch (DeserializationException e) { + throw ZKUtil.convert(e); + } } /** @@ -97,7 +102,12 @@ public class RootRegionTracker extends ZooKeeperNodeTracker { LOG.error(errorMsg); throw new IllegalArgumentException(errorMsg); } - return ZKUtil.znodeContentToServerName(super.blockUntilAvailable(timeout, true)); + try { + return ServerName.parseFrom(super.blockUntilAvailable(timeout, true)); + } catch (DeserializationException e) { + LOG.warn("Failed parse", e); + return null; + } } /** @@ -113,7 +123,7 @@ public class RootRegionTracker extends ZooKeeperNodeTracker { LOG.info("Setting ROOT region location in ZooKeeper as " + location); // Make the RootRegionServer pb and then get its bytes and save this as // the znode content. - byte [] data = getZNodeData(location); + byte [] data = toByteArray(location); try { ZKUtil.createAndWatch(zookeeper, zookeeper.rootServerZNode, data); } catch(KeeperException.NodeExistsException nee) { @@ -127,7 +137,7 @@ public class RootRegionTracker extends ZooKeeperNodeTracker { * @param sn What to put into the znode. * @return The content of the root-region-server znode */ - static byte [] getZNodeData(final ServerName sn) { + static byte [] toByteArray(final ServerName sn) { // ZNode content is a pb message preceeded by some pb magic. HBaseProtos.ServerName pbsn = HBaseProtos.ServerName.newBuilder().setHostName(sn.getHostname()). @@ -164,6 +174,12 @@ public class RootRegionTracker extends ZooKeeperNodeTracker { final long timeout) throws InterruptedException { byte [] data = ZKUtil.blockUntilAvailable(zkw, zkw.rootServerZNode, timeout); - return ZKUtil.znodeContentToServerName(data); + if (data == null) return null; + try { + return ServerName.parseFrom(data); + } catch (DeserializationException e) { + LOG.warn("Failed parse", e); + return null; + } } } diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java index e94b672..0796294 100644 --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java @@ -25,9 +25,13 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.DeserializationException; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.RegionTransition; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.executor.RegionTransitionData; +// We should not be importing this Type here, nor a RegionTransition, etc. This class should be +// about zk and bytes only. import org.apache.hadoop.hbase.executor.EventHandler.EventType; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.KeeperException; @@ -145,10 +149,10 @@ public class ZKAssign { throws KeeperException, KeeperException.NodeExistsException { LOG.debug(zkw.prefix("Creating unassigned node for " + region.getEncodedName() + " in OFFLINE state")); - RegionTransitionData data = new RegionTransitionData(event, - region.getRegionName(), serverName); + RegionTransition rt = + RegionTransition.createRegionTransition(event, region.getRegionName(), serverName); String node = getNodeName(zkw, region.getEncodedName()); - ZKUtil.createAndWatch(zkw, node, data.getBytes()); + ZKUtil.createAndWatch(zkw, node, rt.toByteArray()); } /** @@ -172,10 +176,10 @@ public class ZKAssign { throws KeeperException { LOG.debug(zkw.prefix("Async create of unassigned node for " + region.getEncodedName() + " with OFFLINE state")); - RegionTransitionData data = new RegionTransitionData( - EventType.M_ZK_REGION_OFFLINE, region.getRegionName(), serverName); + RegionTransition rt = + RegionTransition.createRegionTransition(EventType.M_ZK_REGION_OFFLINE, region.getRegionName(), serverName); String node = getNodeName(zkw, region.getEncodedName()); - ZKUtil.asyncCreate(zkw, node, data.getBytes(), cb, ctx); + ZKUtil.asyncCreate(zkw, node, rt.toByteArray(), cb, ctx); } /** @@ -201,10 +205,10 @@ public class ZKAssign { throws KeeperException, KeeperException.NoNodeException { LOG.debug(zkw.prefix("Forcing existing unassigned node for " + region.getEncodedName() + " to OFFLINE state")); - RegionTransitionData data = new RegionTransitionData( - EventType.M_ZK_REGION_OFFLINE, region.getRegionName(), serverName); + RegionTransition rt = + RegionTransition.createRegionTransition(EventType.M_ZK_REGION_OFFLINE, region.getRegionName(), serverName); String node = getNodeName(zkw, region.getEncodedName()); - ZKUtil.setData(zkw, node, data.getBytes()); + ZKUtil.setData(zkw, node, rt.toByteArray()); } /** @@ -267,8 +271,9 @@ public class ZKAssign { throws KeeperException { LOG.debug(zkw.prefix("Creating (or updating) unassigned node for " + region.getEncodedName() + " with OFFLINE state")); - RegionTransitionData data = new RegionTransitionData( - EventType.M_ZK_REGION_OFFLINE, region.getRegionName(), serverName); + RegionTransition rt = RegionTransition.createRegionTransition(EventType.M_ZK_REGION_OFFLINE, + region.getRegionName(), serverName, HConstants.EMPTY_BYTE_ARRAY); + byte [] data = rt.toByteArray(); String node = getNodeName(zkw, region.getEncodedName()); Stat stat = new Stat(); zkw.sync(node); @@ -282,15 +287,20 @@ public class ZKAssign { if (hijack && !allowCreation) { return -1; } - return ZKUtil.createAndWatch(zkw, node, data.getBytes()); + return ZKUtil.createAndWatch(zkw, node, data); } else { - RegionTransitionData curDataInZNode = ZKAssign.getDataNoWatch(zkw, region - .getEncodedName(), stat); + byte [] curDataInZNode = ZKAssign.getDataNoWatch(zkw, region.getEncodedName(), stat); + RegionTransition curRt; + try { + curRt = curDataInZNode == null? null: RegionTransition.parseFrom(curDataInZNode); + } catch (DeserializationException e) { + throw ZKUtil.convert(e); + } // Do not move the node to OFFLINE if znode is in any of the following // state. // Because these are already executed states. - if (hijack && null != curDataInZNode) { - EventType eventType = curDataInZNode.getEventType(); + if (hijack && curRt != null) { + EventType eventType = curRt.getEventType(); if (eventType.equals(EventType.M_ZK_REGION_CLOSING) || eventType.equals(EventType.RS_ZK_REGION_CLOSED) || eventType.equals(EventType.RS_ZK_REGION_OPENED)) { @@ -300,7 +310,7 @@ public class ZKAssign { boolean setData = false; try { - setData = ZKUtil.setData(zkw, node, data.getBytes(), version); + setData = ZKUtil.setData(zkw, node, data, version); // Setdata throws KeeperException which aborts the Master. So we are // catching it here. // If just before setting the znode to OFFLINE if the RS has made any @@ -315,9 +325,13 @@ public class ZKAssign { } else { // We successfully forced to OFFLINE, reset watch and handle if // the state changed in between our set and the watch - RegionTransitionData curData = - ZKAssign.getData(zkw, region.getEncodedName()); - if (curData.getEventType() != data.getEventType()) { + byte [] bytes = ZKAssign.getData(zkw, region.getEncodedName()); + try { + rt = RegionTransition.parseFrom(bytes); + } catch (DeserializationException e) { + throw ZKUtil.convert(e); + } + if (rt.getEventType() != EventType.M_ZK_REGION_OFFLINE) { // state changed, need to process return -1; } @@ -494,11 +508,17 @@ public class ZKAssign { // If it came back null, node does not exist. throw KeeperException.create(Code.NONODE); } - RegionTransitionData data = RegionTransitionData.fromBytes(bytes); - if (!data.getEventType().equals(expectedState)) { - LOG.warn(zkw.prefix("Attempting to delete unassigned " + - "node " + regionName + " in " + expectedState + - " state but node is in " + data.getEventType() + " state")); + RegionTransition rt; + try { + rt = RegionTransition.parseFrom(bytes); + } catch (DeserializationException e) { + // Convert exception. Otherwise have to alter public api + throw ZKUtil.convert(e); + } + EventType et = rt.getEventType(); + if (!et.equals(expectedState)) { + LOG.warn(zkw.prefix("Attempting to delete unassigned node " + regionName + " in " + + expectedState + " state but node is in " + et + " state")); return false; } if (expectedVersion != -1 @@ -564,12 +584,10 @@ public class ZKAssign { throws KeeperException, KeeperException.NodeExistsException { LOG.debug(zkw.prefix("Creating unassigned node for " + region.getEncodedName() + " in a CLOSING state")); - - RegionTransitionData data = new RegionTransitionData( - EventType.M_ZK_REGION_CLOSING, region.getRegionName(), serverName); - + RegionTransition rt = RegionTransition.createRegionTransition(EventType.M_ZK_REGION_CLOSING, + region.getRegionName(), serverName, HConstants.EMPTY_BYTE_ARRAY); String node = getNodeName(zkw, region.getEncodedName()); - return ZKUtil.createAndWatch(zkw, node, data.getBytes()); + return ZKUtil.createAndWatch(zkw, node, rt.toByteArray()); } /** @@ -748,8 +766,7 @@ public class ZKAssign { ServerName serverName, EventType beginState, EventType endState, int expectedVersion) throws KeeperException { - return transitionNode(zkw, region, serverName, beginState, endState, - expectedVersion, null); + return transitionNode(zkw, region, serverName, beginState, endState, expectedVersion, null); } public static int transitionNode(ZooKeeperWatcher zkw, HRegionInfo region, @@ -773,11 +790,16 @@ public class ZKAssign { // Node no longer exists. Return -1. It means unsuccessful transition. return -1; } - RegionTransitionData existingData = - RegionTransitionData.fromBytes(existingBytes); + RegionTransition rt; + try { + rt = RegionTransition.parseFrom(existingBytes); + } catch (DeserializationException e) { + // Convert to a zk exception for now. Otherwise have to change API + throw ZKUtil.convert(e); + } // Verify it is the expected version - if(expectedVersion != -1 && stat.getVersion() != expectedVersion) { + if (expectedVersion != -1 && stat.getVersion() != expectedVersion) { LOG.warn(zkw.prefix("Attempt to transition the " + "unassigned node for " + encoded + " from " + beginState + " to " + endState + " failed, " + @@ -799,20 +821,19 @@ public class ZKAssign { } // Verify it is in expected state - if(!existingData.getEventType().equals(beginState)) { + EventType et = rt.getEventType(); + if (!et.equals(beginState)) { LOG.warn(zkw.prefix("Attempt to transition the " + "unassigned node for " + encoded + " from " + beginState + " to " + endState + " failed, " + - "the node existed but was in the state " + existingData.getEventType() + - " set by the server " + serverName)); + "the node existed but was in the state " + et + " set by the server " + serverName)); return -1; } // Write new data, ensuring data has not changed since we last read it try { - RegionTransitionData data = new RegionTransitionData(endState, - region.getRegionName(), serverName, payload); - if(!ZKUtil.setData(zkw, node, data.getBytes(), stat.getVersion())) { + rt = RegionTransition.createRegionTransition(endState, region.getRegionName(), serverName, payload); + if(!ZKUtil.setData(zkw, node, rt.toByteArray(), stat.getVersion())) { LOG.warn(zkw.prefix("Attempt to transition the " + "unassigned node for " + encoded + " from " + beginState + " to " + endState + " failed, " + @@ -845,19 +866,14 @@ public class ZKAssign { * * @param zkw zk reference * @param pathOrRegionName fully-specified path or region name - * @return data for the unassigned node + * @return znode content * @throws KeeperException if unexpected zookeeper exception */ - public static RegionTransitionData getData(ZooKeeperWatcher zkw, + public static byte [] getData(ZooKeeperWatcher zkw, String pathOrRegionName) throws KeeperException { - String node = pathOrRegionName.startsWith("/") ? - pathOrRegionName : getNodeName(zkw, pathOrRegionName); - byte [] data = ZKUtil.getDataAndWatch(zkw, node); - if(data == null) { - return null; - } - return RegionTransitionData.fromBytes(data); + String node = getPath(zkw, pathOrRegionName); + return ZKUtil.getDataAndWatch(zkw, node); } /** @@ -871,19 +887,14 @@ public class ZKAssign { * @param zkw zk reference * @param pathOrRegionName fully-specified path or region name * @param stat object to populate the version. - * @return data for the unassigned node + * @return znode content * @throws KeeperException if unexpected zookeeper exception */ - public static RegionTransitionData getDataAndWatch(ZooKeeperWatcher zkw, + public static byte [] getDataAndWatch(ZooKeeperWatcher zkw, String pathOrRegionName, Stat stat) throws KeeperException { - String node = pathOrRegionName.startsWith("/") ? - pathOrRegionName : getNodeName(zkw, pathOrRegionName); - byte [] data = ZKUtil.getDataAndWatch(zkw, node, stat); - if(data == null) { - return null; - } - return RegionTransitionData.fromBytes(data); + String node = getPath(zkw, pathOrRegionName); + return ZKUtil.getDataAndWatch(zkw, node, stat); } /** @@ -897,19 +908,23 @@ public class ZKAssign { * @param zkw zk reference * @param pathOrRegionName fully-specified path or region name * @param stat object to store node info into on getData call - * @return data for the unassigned node or null if node does not exist + * @return znode content * @throws KeeperException if unexpected zookeeper exception */ - public static RegionTransitionData getDataNoWatch(ZooKeeperWatcher zkw, + public static byte [] getDataNoWatch(ZooKeeperWatcher zkw, String pathOrRegionName, Stat stat) throws KeeperException { - String node = pathOrRegionName.startsWith("/") ? - pathOrRegionName : getNodeName(zkw, pathOrRegionName); - byte [] data = ZKUtil.getDataNoWatch(zkw, node, stat); - if (data == null) { - return null; - } - return RegionTransitionData.fromBytes(data); + String node = getPath(zkw, pathOrRegionName); + return ZKUtil.getDataNoWatch(zkw, node, stat); + } + + /** + * @param zkw + * @param pathOrRegionName + * @return Path to znode + */ + private static String getPath(final ZooKeeperWatcher zkw, final String pathOrRegionName) { + return pathOrRegionName.startsWith("/")? pathOrRegionName : getNodeName(zkw, pathOrRegionName); } /** @@ -983,42 +998,18 @@ public class ZKAssign { } /** - * Verifies that the specified region is in the specified state in ZooKeeper. - *

- * Returns true if region is in transition and in the specified state in - * ZooKeeper. Returns false if the region does not exist in ZK or is in - * a different state. - *

- * Method synchronizes() with ZK so will yield an up-to-date result but is - * a slow read. - * @param zkw - * @param region - * @param expectedState - * @return true if region exists and is in expected state + * Presume bytes are serialized unassigned data structure + * @param znodeBytes + * @return String of the deserialized znode bytes. */ - public static boolean verifyRegionState(ZooKeeperWatcher zkw, - HRegionInfo region, EventType expectedState) - throws KeeperException { - String encoded = region.getEncodedName(); - - String node = getNodeName(zkw, encoded); - zkw.sync(node); - - // Read existing data of the node - byte [] existingBytes = null; + static String toString(final byte[] znodeBytes) { + // This method should not exist. Used by ZKUtil stringifying RegionTransition. Have the + // method in here so RegionTransition does not leak into ZKUtil. try { - existingBytes = ZKUtil.getDataAndWatch(zkw, node); - } catch (KeeperException.NoNodeException nne) { - return false; - } catch (KeeperException e) { - throw e; - } - if (existingBytes == null) return false; - RegionTransitionData existingData = - RegionTransitionData.fromBytes(existingBytes); - if (existingData.getEventType() == expectedState){ - return true; + RegionTransition rt = RegionTransition.parseFrom(znodeBytes); + return rt.toString(); + } catch (DeserializationException e) { + return ""; } - return false; } -} +} \ No newline at end of file diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java index 30d7fe9..8ca13a8 100644 --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java @@ -1,7 +1,5 @@ /** - * Copyright 2011 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one + * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file @@ -21,13 +19,10 @@ package org.apache.hadoop.hbase.zookeeper; import java.io.IOException; import java.io.UnsupportedEncodingException; -import java.lang.reflect.Field; import java.net.URLDecoder; import java.net.URLEncoder; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; -import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,30 +32,23 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.regionserver.SplitLogWorker; -import org.apache.hadoop.hbase.util.Bytes; /** - * Common methods and attributes used by {@link SplitLogManager} and - * {@link SplitLogWorker} + * Common methods and attributes used by {@link SplitLogManager} and {@link SplitLogWorker} + * running distributed splitting of WAL logs. */ @InterfaceAudience.Private public class ZKSplitLog { private static final Log LOG = LogFactory.getLog(ZKSplitLog.class); - public static final int DEFAULT_TIMEOUT = 25000; // 25 sec - public static final int DEFAULT_ZK_RETRIES = 3; - public static final int DEFAULT_MAX_RESUBMIT = 3; - public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); //3 min - /** * Gets the full path node name for the log file being split. * This method will url encode the filename. * @param zkw zk reference * @param filename log file name (only the basename) */ - public static String getEncodedNodeName(ZooKeeperWatcher zkw, - String filename) { - return ZKUtil.joinZNode(zkw.splitLogZNode, encode(filename)); + public static String getEncodedNodeName(ZooKeeperWatcher zkw, String filename) { + return ZKUtil.joinZNode(zkw.splitLogZNode, encode(filename)); } public static String getFileName(String node) { @@ -68,8 +56,7 @@ public class ZKSplitLog { return decode(basename); } - - public static String encode(String s) { + static String encode(String s) { try { return URLEncoder.encode(s, "UTF-8"); } catch (UnsupportedEncodingException e) { @@ -77,7 +64,7 @@ public class ZKSplitLog { } } - public static String decode(String s) { + static String decode(String s) { try { return URLDecoder.decode(s, "UTF-8"); } catch (UnsupportedEncodingException e) { @@ -107,53 +94,6 @@ public class ZKSplitLog { return dirname.equals(zkw.splitLogZNode); } - public static enum TaskState { - TASK_UNASSIGNED("unassigned"), - TASK_OWNED("owned"), - TASK_RESIGNED("resigned"), - TASK_DONE("done"), - TASK_ERR("err"); - - private final byte[] state; - private TaskState(String s) { - state = s.getBytes(); - } - - public byte[] get(String serverName) { - return (Bytes.add(state, " ".getBytes(), serverName.getBytes())); - } - - public String getWriterName(byte[] data) { - String str = Bytes.toString(data); - return str.substring(str.indexOf(' ') + 1); - } - - - /** - * @param s - * @return True if {@link #state} is a prefix of s. False otherwise. - */ - public boolean equals(byte[] s) { - if (s.length < state.length) { - return (false); - } - for (int i = 0; i < state.length; i++) { - if (state[i] != s[i]) { - return (false); - } - } - return (true); - } - - public boolean equals(byte[] s, String serverName) { - return (Arrays.equals(s, get(serverName))); - } - @Override - public String toString() { - return new String(state); - } - } - public static Path getSplitLogDir(Path rootdir, String tmpname) { return new Path(new Path(rootdir, HConstants.SPLIT_LOGDIR_NAME), tmpname); } @@ -172,8 +112,8 @@ public class ZKSplitLog { return ret; } - public static String getSplitLogDirTmpComponent(String worker, String file) { - return (worker + "_" + ZKSplitLog.encode(file)); + public static String getSplitLogDirTmpComponent(final String worker, String file) { + return worker + "_" + ZKSplitLog.encode(file); } public static void markCorrupted(Path rootdir, String tmpname, @@ -198,81 +138,4 @@ public class ZKSplitLog { public static boolean isCorruptFlagFile(Path file) { return file.getName().equals("corrupt"); } - - - public static class Counters { - //SplitLogManager counters - public static AtomicLong tot_mgr_log_split_batch_start = new AtomicLong(0); - public static AtomicLong tot_mgr_log_split_batch_success = - new AtomicLong(0); - public static AtomicLong tot_mgr_log_split_batch_err = new AtomicLong(0); - public static AtomicLong tot_mgr_new_unexpected_hlogs = new AtomicLong(0); - public static AtomicLong tot_mgr_log_split_start = new AtomicLong(0); - public static AtomicLong tot_mgr_log_split_success = new AtomicLong(0); - public static AtomicLong tot_mgr_log_split_err = new AtomicLong(0); - public static AtomicLong tot_mgr_node_create_queued = new AtomicLong(0); - public static AtomicLong tot_mgr_node_create_result = new AtomicLong(0); - public static AtomicLong tot_mgr_node_already_exists = new AtomicLong(0); - public static AtomicLong tot_mgr_node_create_err = new AtomicLong(0); - public static AtomicLong tot_mgr_node_create_retry = new AtomicLong(0); - public static AtomicLong tot_mgr_get_data_queued = new AtomicLong(0); - public static AtomicLong tot_mgr_get_data_result = new AtomicLong(0); - public static AtomicLong tot_mgr_get_data_nonode = new AtomicLong(0); - public static AtomicLong tot_mgr_get_data_err = new AtomicLong(0); - public static AtomicLong tot_mgr_get_data_retry = new AtomicLong(0); - public static AtomicLong tot_mgr_node_delete_queued = new AtomicLong(0); - public static AtomicLong tot_mgr_node_delete_result = new AtomicLong(0); - public static AtomicLong tot_mgr_node_delete_err = new AtomicLong(0); - public static AtomicLong tot_mgr_resubmit = new AtomicLong(0); - public static AtomicLong tot_mgr_resubmit_failed = new AtomicLong(0); - public static AtomicLong tot_mgr_null_data = new AtomicLong(0); - public static AtomicLong tot_mgr_orphan_task_acquired = new AtomicLong(0); - public static AtomicLong tot_mgr_wait_for_zk_delete = new AtomicLong(0); - public static AtomicLong tot_mgr_unacquired_orphan_done = new AtomicLong(0); - public static AtomicLong tot_mgr_resubmit_threshold_reached = - new AtomicLong(0); - public static AtomicLong tot_mgr_missing_state_in_delete = - new AtomicLong(0); - public static AtomicLong tot_mgr_heartbeat = new AtomicLong(0); - public static AtomicLong tot_mgr_rescan = new AtomicLong(0); - public static AtomicLong tot_mgr_rescan_deleted = new AtomicLong(0); - public static AtomicLong tot_mgr_task_deleted = new AtomicLong(0); - public static AtomicLong tot_mgr_resubmit_unassigned = new AtomicLong(0); - public static AtomicLong tot_mgr_relist_logdir = new AtomicLong(0); - public static AtomicLong tot_mgr_resubmit_dead_server_task = - new AtomicLong(0); - - - - // SplitLogWorker counters - public static AtomicLong tot_wkr_failed_to_grab_task_no_data = - new AtomicLong(0); - public static AtomicLong tot_wkr_failed_to_grab_task_exception = - new AtomicLong(0); - public static AtomicLong tot_wkr_failed_to_grab_task_owned = - new AtomicLong(0); - public static AtomicLong tot_wkr_failed_to_grab_task_lost_race = - new AtomicLong(0); - public static AtomicLong tot_wkr_task_acquired = new AtomicLong(0); - public static AtomicLong tot_wkr_task_resigned = new AtomicLong(0); - public static AtomicLong tot_wkr_task_done = new AtomicLong(0); - public static AtomicLong tot_wkr_task_err = new AtomicLong(0); - public static AtomicLong tot_wkr_task_heartbeat = new AtomicLong(0); - public static AtomicLong tot_wkr_task_acquired_rescan = new AtomicLong(0); - public static AtomicLong tot_wkr_get_data_queued = new AtomicLong(0); - public static AtomicLong tot_wkr_get_data_result = new AtomicLong(0); - public static AtomicLong tot_wkr_get_data_retry = new AtomicLong(0); - public static AtomicLong tot_wkr_preempt_task = new AtomicLong(0); - public static AtomicLong tot_wkr_task_heartbeat_failed = new AtomicLong(0); - public static AtomicLong tot_wkr_final_transistion_failed = - new AtomicLong(0); - - public static void resetCounters() throws Exception { - Class cl = (new Counters()).getClass(); - Field[] flds = cl.getDeclaredFields(); - for (Field fld : flds) { - ((AtomicLong)fld.get(null)).set(0); - } - } - } -} +} \ No newline at end of file diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index 46a6fde..f725348 100644 --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -26,7 +26,6 @@ import java.io.PrintWriter; import java.net.InetSocketAddress; import java.net.Socket; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Properties; @@ -36,28 +35,20 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.EmptyWatcher; +import org.apache.hadoop.hbase.DeserializationException; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.executor.RegionTransitionData; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; -import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RootRegionServer; -import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; -import com.google.protobuf.InvalidProtocolBufferException; - /** * Internal HBase utility class for ZooKeeper. * @@ -359,8 +350,7 @@ public class ZKUtil { * null if parent does not exist * @throws KeeperException if unexpected zookeeper exception */ - public static List listChildrenNoWatch( - ZooKeeperWatcher zkw, String znode) + public static List listChildrenNoWatch(ZooKeeperWatcher zkw, String znode) throws KeeperException { List children = null; try { @@ -376,7 +366,9 @@ public class ZKUtil { /** * Simple class to hold a node path and node data. + * @deprecated Unused */ + @Deprecated public static class NodeAndData { private String node; private byte [] data; @@ -392,7 +384,7 @@ public class ZKUtil { } @Override public String toString() { - return node + " (" + RegionTransitionData.fromBytes(data) + ")"; + return node; } public boolean isEmpty() { return (data.length == 0); @@ -600,6 +592,7 @@ public class ZKUtil { * @return list of data of children of the specified node, an empty list if the node * exists but has no children, and null if the node does not exist * @throws KeeperException if unexpected zookeeper exception + * @deprecated Unused */ public static List getChildDataAndWatchForNewChildren( ZooKeeperWatcher zkw, String baseNode) throws KeeperException { @@ -630,6 +623,7 @@ public class ZKUtil { * @param expectedVersion * @throws KeeperException if unexpected zookeeper exception * @throws KeeperException.BadVersionException if version mismatch + * @deprecated Unused */ public static void updateExistingNodeData(ZooKeeperWatcher zkw, String znode, byte [] data, int expectedVersion) @@ -1144,9 +1138,9 @@ public class ZKUtil { " byte(s) of data from znode " + znode + (watcherSet? " and set watcher; ": "; data=") + (data == null? "null": data.length == 0? "empty": ( - znode.startsWith(zkw.assignmentZNode) ? - RegionTransitionData.fromBytes(data).toString() - : StringUtils.abbreviate(Bytes.toStringBinary(data), 32))))); + znode.startsWith(zkw.assignmentZNode)? + ZKAssign.toString(data): // We should not be doing this reaching into another class + StringUtils.abbreviate(Bytes.toStringBinary(data), 32))))); } /** @@ -1222,44 +1216,14 @@ public class ZKUtil { /** - * Get a ServerName from the passed in znode data bytes. - * @param data ZNode data with a server name in it; can handle the old style - * servername where servername was host and port. Works too with data that - * begins w/ the pb 'PBUF' magic and that its then followed by a protobuf that - * has a serialized {@link ServerName} in it. - * @return Returns null if data is null else converts passed data - * to a ServerName instance. + * Convert a {@link DeserializationException} to a more palatable {@link KeeperException}. + * Used when can't let a {@link DeserializationException} out w/o changing public API. + * @param e Exception to convert + * @return Converted exception */ - public static ServerName znodeContentToServerName(final byte [] data) { - if (data == null || data.length <= 0) return null; - if (ProtobufUtil.isPBMagicPrefix(data)) { - int prefixLen = ProtobufUtil.lengthOfPBMagic(); - try { - RootRegionServer rss = - RootRegionServer.newBuilder().mergeFrom(data, prefixLen, data.length - prefixLen).build(); - HBaseProtos.ServerName sn = rss.getServer(); - return new ServerName(sn.getHostName(), sn.getPort(), sn.getStartCode()); - } catch (InvalidProtocolBufferException e) { - // A failed parse of the znode is pretty catastrophic. Rather than loop - // retrying hoping the bad bytes will changes, and rather than change - // the signature on this method to add an IOE which will send ripples all - // over the code base, throw a RuntimeException. This should "never" happen. - // Fail fast if it does. - throw new RuntimeException(e); - } - } - // The str returned could be old style -- pre hbase-1502 -- which was - // hostname and port seperated by a colon rather than hostname, port and - // startcode delimited by a ','. - String str = Bytes.toString(data); - int index = str.indexOf(ServerName.SERVERNAME_SEPARATOR); - if (index != -1) { - // Presume its ServerName serialized with versioned bytes. - return ServerName.parseVersionedServerName(data); - } - // Presume it a hostname:port format. - String hostname = Addressing.parseHostname(str); - int port = Addressing.parsePort(str); - return new ServerName(hostname, port, -1L); + public static KeeperException convert(final DeserializationException e) { + KeeperException ke = new KeeperException.DataInconsistencyException(); + ke.initCause(e); + return ke; } } \ No newline at end of file diff --git src/main/protobuf/ZooKeeper.proto src/main/protobuf/ZooKeeper.proto index 961ab65..98142ce 100644 --- src/main/protobuf/ZooKeeper.proto +++ src/main/protobuf/ZooKeeper.proto @@ -60,3 +60,33 @@ message ClusterUp { // the data is cluster startDate. required string startDate = 1; } + +/** + * What we write under unassigned up in zookeeper as a region moves through + * open/close, etc., regions. Details a region in transition. + */ +message RegionTransition { + // Code for EventType gotten by doing o.a.h.h.EventHandler.EventType.getCode() + required uint32 eventTypeCode = 1; + // Full regionname in bytes + required bytes regionName = 2; + required uint64 createTime = 3; + optional ServerName originServerName = 4; + optional bytes payload = 5; +} + +/** + * WAL SplitLog directory znodes have this for content. Used doing distributed + * WAL splitting. Holds current state and name of server that originated split. + */ +message SplitLogTask { + enum State { + UNASSIGNED = 0; + OWNED = 1; + RESIGNED = 2; + DONE = 3; + ERR = 4; + } + required State state = 1; + required ServerName serverName = 2; +} diff --git src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index c3a1889..3a1dbb8 100644 --- src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -81,6 +81,7 @@ import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.RegionSplitter; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.hbase.zookeeper.EmptyWatcher; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hbase.zookeeper.ZKAssign; import org.apache.hadoop.hbase.zookeeper.ZKConfig; diff --git src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java index d496d48..1391dc8 100644 --- src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java +++ src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.EmptyWatcher; import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; diff --git src/test/java/org/apache/hadoop/hbase/master/Mocking.java src/test/java/org/apache/hadoop/hbase/master/Mocking.java index 676d6bb..51f9308 100644 --- src/test/java/org/apache/hadoop/hbase/master/Mocking.java +++ src/test/java/org/apache/hadoop/hbase/master/Mocking.java @@ -23,9 +23,11 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.hbase.DeserializationException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.RegionTransition; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.executor.EventHandler.EventType; @@ -33,6 +35,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.zookeeper.ZKAssign; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; @@ -71,12 +74,13 @@ public class Mocking { * @param sn Name of the regionserver doing the 'opening' * @param hri Region we're 'opening'. * @throws KeeperException + * @throws DeserializationException */ static void fakeRegionServerRegionOpenInZK(final ZooKeeperWatcher w, final ServerName sn, final HRegionInfo hri) - throws KeeperException { + throws KeeperException, DeserializationException { // Wait till we see the OFFLINE zk node before we proceed. - while (!ZKAssign.verifyRegionState(w, hri, EventType.M_ZK_REGION_OFFLINE)) { + while (!verifyRegionState(w, hri, EventType.M_ZK_REGION_OFFLINE)) { Threads.sleep(1); } // Get current versionid else will fail on transition from OFFLINE to OPENING below @@ -94,4 +98,40 @@ public class Mocking { // We should be done now. The master open handler will notice the // transition and remove this regions znode. } + + /** + * Verifies that the specified region is in the specified state in ZooKeeper. + *

+ * Returns true if region is in transition and in the specified state in + * ZooKeeper. Returns false if the region does not exist in ZK or is in + * a different state. + *

+ * Method synchronizes() with ZK so will yield an up-to-date result but is + * a slow read. + * @param zkw + * @param region + * @param expectedState + * @return true if region exists and is in expected state + * @throws DeserializationException + */ + static boolean verifyRegionState(ZooKeeperWatcher zkw, HRegionInfo region, EventType expectedState) + throws KeeperException, DeserializationException { + String encoded = region.getEncodedName(); + + String node = ZKAssign.getNodeName(zkw, encoded); + zkw.sync(node); + + // Read existing data of the node + byte [] existingBytes = null; + try { + existingBytes = ZKUtil.getDataAndWatch(zkw, node); + } catch (KeeperException.NoNodeException nne) { + return false; + } catch (KeeperException e) { + throw e; + } + if (existingBytes == null) return false; + RegionTransition rt = RegionTransition.parseFrom(existingBytes); + return rt.getEventType().equals(expectedState); + } } diff --git src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java index 36046f8..c3ccf21 100644 --- src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java +++ src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java @@ -29,23 +29,24 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hadoop.hbase.DeserializationException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HServerLoad; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.RegionTransition; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.client.ClientProtocol; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionTestingUtility; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.executor.ExecutorService; -import org.apache.hadoop.hbase.executor.RegionTransitionData; import org.apache.hadoop.hbase.executor.EventHandler.EventType; +import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType; import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -61,10 +62,9 @@ import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper; import org.apache.hadoop.hbase.zookeeper.ZKAssign; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.hadoop.hbase.MediumTests; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.Watcher; import org.apache.zookeeper.KeeperException.NodeExistsException; +import org.apache.zookeeper.Watcher; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -159,10 +159,11 @@ public class TestAssignmentManager { * @throws IOException * @throws KeeperException * @throws InterruptedException + * @throws DeserializationException */ @Test(timeout = 5000) public void testBalanceOnMasterFailoverScenarioWithOpenedNode() - throws IOException, KeeperException, InterruptedException, ServiceException { + throws IOException, KeeperException, InterruptedException, ServiceException, DeserializationException { AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager(this.server, this.serverManager); try { @@ -177,7 +178,7 @@ public class TestAssignmentManager { int versionid = ZKAssign.transitionNodeClosed(this.watcher, REGIONINFO, SERVERNAME_A, -1); assertNotSame(versionid, -1); - while (!ZKAssign.verifyRegionState(this.watcher, REGIONINFO, + while (!Mocking.verifyRegionState(this.watcher, REGIONINFO, EventType.M_ZK_REGION_OFFLINE)) { Threads.sleep(1); } @@ -205,7 +206,7 @@ public class TestAssignmentManager { @Test(timeout = 5000) public void testBalanceOnMasterFailoverScenarioWithClosedNode() - throws IOException, KeeperException, InterruptedException, ServiceException { + throws IOException, KeeperException, InterruptedException, ServiceException, DeserializationException { AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager(this.server, this.serverManager); try { @@ -221,7 +222,7 @@ public class TestAssignmentManager { ZKAssign.transitionNodeClosed(this.watcher, REGIONINFO, SERVERNAME_A, -1); assertNotSame(versionid, -1); am.gate.set(false); - while (!ZKAssign.verifyRegionState(this.watcher, REGIONINFO, + while (!Mocking.verifyRegionState(this.watcher, REGIONINFO, EventType.M_ZK_REGION_OFFLINE)) { Threads.sleep(1); } @@ -249,7 +250,7 @@ public class TestAssignmentManager { @Test(timeout = 5000) public void testBalanceOnMasterFailoverScenarioWithOfflineNode() - throws IOException, KeeperException, InterruptedException, ServiceException { + throws IOException, KeeperException, InterruptedException, ServiceException, DeserializationException { AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager(this.server, this.serverManager); try { @@ -264,7 +265,7 @@ public class TestAssignmentManager { int versionid = ZKAssign.transitionNodeClosed(this.watcher, REGIONINFO, SERVERNAME_A, -1); assertNotSame(versionid, -1); - while (!ZKAssign.verifyRegionState(this.watcher, REGIONINFO, + while (!Mocking.verifyRegionState(this.watcher, REGIONINFO, EventType.M_ZK_REGION_OFFLINE)) { Threads.sleep(1); } @@ -306,10 +307,11 @@ public class TestAssignmentManager { * from one server to another mocking regionserver responding over zk. * @throws IOException * @throws KeeperException + * @throws DeserializationException */ @Test public void testBalance() - throws IOException, KeeperException { + throws IOException, KeeperException, DeserializationException { // Create and startup an executor. This is used by AssignmentManager // handling zk callbacks. ExecutorService executor = startupMasterExecutor("testBalanceExecutor"); @@ -345,7 +347,7 @@ public class TestAssignmentManager { // balancer. The zk node will be OFFLINE waiting for regionserver to // transition it through OPENING, OPENED. Wait till we see the OFFLINE // zk node before we proceed. - while (!ZKAssign.verifyRegionState(this.watcher, REGIONINFO, EventType.M_ZK_REGION_OFFLINE)) { + while (!Mocking.verifyRegionState(this.watcher, REGIONINFO, EventType.M_ZK_REGION_OFFLINE)) { Threads.sleep(1); } // Get current versionid else will fail on transition from OFFLINE to OPENING below @@ -541,12 +543,12 @@ public class TestAssignmentManager { private static int createNodeSplitting(final ZooKeeperWatcher zkw, final HRegionInfo region, final ServerName serverName) throws KeeperException, IOException { - RegionTransitionData data = - new RegionTransitionData(EventType.RS_ZK_REGION_SPLITTING, + RegionTransition rt = + RegionTransition.createRegionTransition(EventType.RS_ZK_REGION_SPLITTING, region.getRegionName(), serverName); String node = ZKAssign.getNodeName(zkw, region.getEncodedName()); - if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, data.getBytes())) { + if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, rt.toByteArray())) { throw new IOException("Failed create of ephemeral " + node); } // Transition node from SPLITTING to SPLITTING and pick up version so we @@ -650,12 +652,12 @@ public class TestAssignmentManager { deadServers); } @Override - void processRegionsInTransition(final RegionTransitionData data, + void processRegionsInTransition(final RegionTransition rt, final HRegionInfo regionInfo, final Map>> deadServers, final int expectedVersion) throws KeeperException { while (this.gate.get()) Threads.sleep(1); - super.processRegionsInTransition(data, regionInfo, deadServers, expectedVersion); + super.processRegionsInTransition(rt, regionInfo, deadServers, expectedVersion); } /** reset the watcher */ diff --git src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index 2669876..f97cd2c 100644 --- src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -19,7 +19,7 @@ */ package org.apache.hadoop.hbase.master; -import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*; +import static org.apache.hadoop.hbase.SplitLogCounters.*; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -57,7 +57,6 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKAssign; -import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -83,7 +82,7 @@ public class TestDistributedLogSplitting { HBaseTestingUtility TEST_UTIL; private void startCluster(int num_rs) throws Exception{ - ZKSplitLog.Counters.resetCounters(); + SplitLogCounters.resetCounters(); LOG.info("Starting cluster"); conf = HBaseConfiguration.create(); conf.getLong("hbase.splitlog.max.resubmit", 0); diff --git src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java index 14cdb90..59ad92c 100644 --- src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java +++ src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java @@ -37,7 +37,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.executor.EventHandler.EventType; -import org.apache.hadoop.hbase.executor.RegionTransitionData; import org.apache.hadoop.hbase.master.AssignmentManager.RegionState; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; @@ -475,8 +474,9 @@ public class TestMasterFailover { ZKAssign.createNodeOffline(zkw, region, serverName); hrs.openRegion(region); while (true) { - RegionTransitionData rtd = ZKAssign.getData(zkw, region.getEncodedName()); - if (rtd != null && rtd.getEventType() == EventType.RS_ZK_REGION_OPENED) { + byte [] bytes = ZKAssign.getData(zkw, region.getEncodedName()); + RegionTransition rt = RegionTransition.parseFrom(bytes); + if (rt != null && rt.getEventType().equals(EventType.RS_ZK_REGION_OPENED)) { break; } Thread.sleep(100); @@ -488,8 +488,9 @@ public class TestMasterFailover { ZKAssign.createNodeOffline(zkw, region, serverName); hrs.openRegion(region); while (true) { - RegionTransitionData rtd = ZKAssign.getData(zkw, region.getEncodedName()); - if (rtd != null && rtd.getEventType() == EventType.RS_ZK_REGION_OPENED) { + byte [] bytes = ZKAssign.getData(zkw, region.getEncodedName()); + RegionTransition rt = RegionTransition.parseFrom(bytes); + if (rt != null && rt.getEventType().equals(EventType.RS_ZK_REGION_OPENED)) { break; } Thread.sleep(100); @@ -835,8 +836,9 @@ public class TestMasterFailover { ZKAssign.createNodeOffline(zkw, region, deadServerName); hrsDead.openRegion(region); while (true) { - RegionTransitionData rtd = ZKAssign.getData(zkw, region.getEncodedName()); - if (rtd != null && rtd.getEventType() == EventType.RS_ZK_REGION_OPENED) { + byte [] bytes = ZKAssign.getData(zkw, region.getEncodedName()); + RegionTransition rt = RegionTransition.parseFrom(bytes); + if (rt != null && rt.getEventType().equals(EventType.RS_ZK_REGION_OPENED)) { break; } Thread.sleep(100); @@ -850,8 +852,9 @@ public class TestMasterFailover { ZKAssign.createNodeOffline(zkw, region, deadServerName); hrsDead.openRegion(region); while (true) { - RegionTransitionData rtd = ZKAssign.getData(zkw, region.getEncodedName()); - if (rtd != null && rtd.getEventType() == EventType.RS_ZK_REGION_OPENED) { + byte [] bytes = ZKAssign.getData(zkw, region.getEncodedName()); + RegionTransition rt = RegionTransition.parseFrom(bytes); + if (rt != null && rt.getEventType().equals(EventType.RS_ZK_REGION_OPENED)) { break; } Thread.sleep(100); @@ -869,8 +872,9 @@ public class TestMasterFailover { ZKAssign.createNodeOffline(zkw, region, deadServerName); hrsDead.openRegion(region); while (true) { - RegionTransitionData rtd = ZKAssign.getData(zkw, region.getEncodedName()); - if (rtd != null && rtd.getEventType() == EventType.RS_ZK_REGION_OPENED) { + byte [] bytes = ZKAssign.getData(zkw, region.getEncodedName()); + RegionTransition rt = RegionTransition.parseFrom(bytes); + if (rt != null && rt.getEventType().equals(EventType.RS_ZK_REGION_OPENED)) { ZKAssign.deleteOpenedNode(zkw, region.getEncodedName()); break; } @@ -885,8 +889,9 @@ public class TestMasterFailover { ZKAssign.createNodeOffline(zkw, region, deadServerName); hrsDead.openRegion(region); while (true) { - RegionTransitionData rtd = ZKAssign.getData(zkw, region.getEncodedName()); - if (rtd != null && rtd.getEventType() == EventType.RS_ZK_REGION_OPENED) { + byte [] bytes = ZKAssign.getData(zkw, region.getEncodedName()); + RegionTransition rt = RegionTransition.parseFrom(bytes); + if (rt != null && rt.getEventType().equals(EventType.RS_ZK_REGION_OPENED)) { ZKAssign.deleteOpenedNode(zkw, region.getEncodedName()); break; } diff --git src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java index f8029ba..d16d156 100644 --- src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java +++ src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java @@ -27,6 +27,7 @@ import java.net.UnknownHostException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.DeserializationException; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; @@ -227,10 +228,11 @@ public class TestMasterNoCluster { * @throws IOException * @throws KeeperException * @throws InterruptedException + * @throws DeserializationException */ @Test public void testCatalogDeploys() - throws IOException, KeeperException, InterruptedException { + throws IOException, KeeperException, InterruptedException, DeserializationException { final Configuration conf = TESTUTIL.getConfiguration(); final long now = System.currentTimeMillis(); // Name for our single mocked up regionserver. diff --git src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java index 0f7d54e..9857aa4 100644 --- src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java +++ src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java @@ -19,27 +19,43 @@ */ package org.apache.hadoop.hbase.master; -import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*; +import static org.apache.hadoop.hbase.SplitLogCounters.resetCounters; +import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_get_data_nonode; +import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_heartbeat; +import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_log_split_batch_success; +import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_queued; +import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_result; +import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_orphan_task_acquired; +import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan; +import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan_deleted; +import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit; +import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_dead_server_task; +import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_failed; +import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_threshold_reached; +import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_unassigned; +import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_task_deleted; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import java.io.IOException; -import java.util.Arrays; -import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; -import static org.junit.Assert.*; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.SplitLogTask; +import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.master.SplitLogManager.Task; import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; import org.apache.hadoop.hbase.regionserver.TestMasterAddressManager.NodeCreationListener; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; -import org.apache.hadoop.hbase.zookeeper.ZKSplitLog.TaskState; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.log4j.Level; @@ -57,6 +73,7 @@ import org.junit.experimental.categories.Category; @Category(MediumTests.class) public class TestSplitLogManager { private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class); + private final ServerName DUMMY_MASTER = new ServerName("dummy-master,1,1"); static { Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG); } @@ -180,16 +197,16 @@ public class TestSplitLogManager { @Test public void testTaskCreation() throws Exception { LOG.info("TestTaskCreation - test the creation of a task in zk"); - - slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null); slm.finishInitialization(); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); byte[] data = ZKUtil.getData(zkw, tasknode); - LOG.info("Task node created " + new String(data)); - assertTrue(TaskState.TASK_UNASSIGNED.equals(data, "dummy-master")); + SplitLogTask slt = SplitLogTask.parseFrom(data); + LOG.info("Task node created " + slt.toString()); + assertTrue(slt.isUnassigned(DUMMY_MASTER)); } @Test @@ -197,8 +214,8 @@ public class TestSplitLogManager { LOG.info("TestOrphanTaskAcquisition"); String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); - zkw.getRecoverableZooKeeper().create(tasknode, - TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE, + SplitLogTask slt = new SplitLogTask.Owned(DUMMY_MASTER); + zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); int to = 1000; @@ -207,7 +224,7 @@ public class TestSplitLogManager { to = to + 2 * 100; - slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null); slm.finishInitialization(); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100); Task task = slm.findOrCreateOrphanTask(tasknode); @@ -229,12 +246,12 @@ public class TestSplitLogManager { " startup"); String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); //create an unassigned orphan task - zkw.getRecoverableZooKeeper().create(tasknode, - TaskState.TASK_UNASSIGNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE, + SplitLogTask slt = new SplitLogTask.Unassigned(DUMMY_MASTER); + zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); int version = ZKUtil.checkExists(zkw, tasknode); - slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null); slm.finishInitialization(); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100); Task task = slm.findOrCreateOrphanTask(tasknode); @@ -263,24 +280,29 @@ public class TestSplitLogManager { to = to + 2 * 100; conf.setInt("hbase.splitlog.max.resubmit", 2); - slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null); slm.finishInitialization(); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); int version = ZKUtil.checkExists(zkw, tasknode); - - ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker1")); + final ServerName worker1 = new ServerName("worker1,1,1"); + final ServerName worker2 = new ServerName("worker2,1,1"); + final ServerName worker3 = new ServerName("worker3,1,1"); + SplitLogTask slt = new SplitLogTask.Owned(worker1); + ZKUtil.setData(zkw, tasknode, slt.toByteArray()); waitForCounter(tot_mgr_heartbeat, 0, 1, 1000); waitForCounter(tot_mgr_resubmit, 0, 1, to + EXTRA_TOLERANCE_MS); int version1 = ZKUtil.checkExists(zkw, tasknode); assertTrue(version1 > version); - ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker2")); + slt = new SplitLogTask.Owned(worker2); + ZKUtil.setData(zkw, tasknode, slt.toByteArray()); waitForCounter(tot_mgr_heartbeat, 1, 2, 1000); waitForCounter(tot_mgr_resubmit, 1, 2, to + 100); int version2 = ZKUtil.checkExists(zkw, tasknode); assertTrue(version2 > version1); - ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker3")); + slt = new SplitLogTask.Owned(worker3); + ZKUtil.setData(zkw, tasknode, slt.toByteArray()); waitForCounter(tot_mgr_heartbeat, 1, 2, 1000); waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + EXTRA_TOLERANCE_MS); Thread.sleep(to + EXTRA_TOLERANCE_MS); @@ -293,14 +315,15 @@ public class TestSplitLogManager { conf.setInt("hbase.splitlog.manager.timeout", 1000); conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100); - slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null); slm.finishInitialization(); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); int version = ZKUtil.checkExists(zkw, tasknode); - - ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker1")); + final ServerName worker1 = new ServerName("worker1,1,1"); + SplitLogTask slt = new SplitLogTask.Owned(worker1); + ZKUtil.setData(zkw, tasknode, slt.toByteArray()); waitForCounter(tot_mgr_heartbeat, 0, 1, 1000); waitForCounter(new Expr() { @Override @@ -312,8 +335,8 @@ public class TestSplitLogManager { int version1 = ZKUtil.checkExists(zkw, tasknode); assertTrue(version1 > version); byte[] taskstate = ZKUtil.getData(zkw, tasknode); - assertTrue(Arrays.equals(TaskState.TASK_UNASSIGNED.get("dummy-master"), - taskstate)); + slt = SplitLogTask.parseFrom(taskstate); + assertTrue(slt.isUnassigned(DUMMY_MASTER)); waitForCounter(tot_mgr_rescan_deleted, 0, 1, 1000); } else { @@ -327,11 +350,13 @@ public class TestSplitLogManager { public void testTaskDone() throws Exception { LOG.info("TestTaskDone - cleanup task node once in DONE state"); - slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null); slm.finishInitialization(); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); - ZKUtil.setData(zkw, tasknode, TaskState.TASK_DONE.get("worker")); + final ServerName worker1 = new ServerName("worker1,1,1"); + SplitLogTask slt = new SplitLogTask.Done(worker1); + ZKUtil.setData(zkw, tasknode, slt.toByteArray()); synchronized (batch) { while (batch.installed != batch.done) { batch.wait(); @@ -346,12 +371,14 @@ public class TestSplitLogManager { LOG.info("TestTaskErr - cleanup task node once in ERR state"); conf.setInt("hbase.splitlog.max.resubmit", 0); - slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null); slm.finishInitialization(); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); - ZKUtil.setData(zkw, tasknode, TaskState.TASK_ERR.get("worker")); + final ServerName worker1 = new ServerName("worker1,1,1"); + SplitLogTask slt = new SplitLogTask.Err(worker1); + ZKUtil.setData(zkw, tasknode, slt.toByteArray()); synchronized (batch) { while (batch.installed != batch.error) { batch.wait(); @@ -359,18 +386,20 @@ public class TestSplitLogManager { } waitForCounter(tot_mgr_task_deleted, 0, 1, 1000); assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); - conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLog.DEFAULT_MAX_RESUBMIT); + conf.setInt("hbase.splitlog.max.resubmit", SplitLogManager.DEFAULT_MAX_RESUBMIT); } @Test public void testTaskResigned() throws Exception { LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state"); - slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null); slm.finishInitialization(); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); - ZKUtil.setData(zkw, tasknode, TaskState.TASK_RESIGNED.get("worker")); + final ServerName worker1 = new ServerName("worker1,1,1"); + SplitLogTask slt = new SplitLogTask.Resigned(worker1); + ZKUtil.setData(zkw, tasknode, slt.toByteArray()); int version = ZKUtil.checkExists(zkw, tasknode); waitForCounter(tot_mgr_resubmit, 0, 1, 1000); @@ -378,8 +407,8 @@ public class TestSplitLogManager { assertTrue(version1 > version); byte[] taskstate = ZKUtil.getData(zkw, tasknode); - assertTrue(Arrays.equals(taskstate, - TaskState.TASK_UNASSIGNED.get("dummy-master"))); + slt = SplitLogTask.parseFrom(taskstate); + assertTrue(slt.isUnassigned(DUMMY_MASTER)); } @Test @@ -389,8 +418,9 @@ public class TestSplitLogManager { // create an orphan task in OWNED state String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1"); - zkw.getRecoverableZooKeeper().create(tasknode1, - TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE, + final ServerName worker1 = new ServerName("worker1,1,1"); + SplitLogTask slt = new SplitLogTask.Owned(worker1); + zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); int to = 1000; @@ -399,7 +429,7 @@ public class TestSplitLogManager { conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100); - slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null); slm.finishInitialization(); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100); @@ -411,8 +441,9 @@ public class TestSplitLogManager { // keep updating the orphan owned node every to/2 seconds for (int i = 0; i < (3 * to)/100; i++) { Thread.sleep(100); - ZKUtil.setData(zkw, tasknode1, - TaskState.TASK_OWNED.get("dummy-worker")); + final ServerName worker2 = new ServerName("worker1,1,1"); + slt = new SplitLogTask.Owned(worker2); + ZKUtil.setData(zkw, tasknode1, slt.toByteArray()); } // since we have stopped heartbeating the owned node therefore it should @@ -429,14 +460,15 @@ public class TestSplitLogManager { LOG.info("testDeadWorker"); conf.setLong("hbase.splitlog.max.resubmit", 0); - slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null); slm.finishInitialization(); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); int version = ZKUtil.checkExists(zkw, tasknode); - - ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker1")); + final ServerName worker1 = new ServerName("worker1,1,1"); + SplitLogTask slt = new SplitLogTask.Owned(worker1); + ZKUtil.setData(zkw, tasknode, slt.toByteArray()); waitForCounter(tot_mgr_heartbeat, 0, 1, 1000); slm.handleDeadWorker("worker1"); waitForCounter(tot_mgr_resubmit, 0, 1, 1000); @@ -445,15 +477,15 @@ public class TestSplitLogManager { int version1 = ZKUtil.checkExists(zkw, tasknode); assertTrue(version1 > version); byte[] taskstate = ZKUtil.getData(zkw, tasknode); - assertTrue(Arrays.equals(TaskState.TASK_UNASSIGNED.get("dummy-master"), - taskstate)); + slt = SplitLogTask.parseFrom(taskstate); + assertTrue(slt.isUnassigned(DUMMY_MASTER)); return; } @Test public void testEmptyLogDir() throws Exception { LOG.info("testEmptyLogDir"); - slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null); slm.finishInitialization(); FileSystem fs = TEST_UTIL.getTestFileSystem(); Path emptyLogDirPath = new Path(fs.getWorkingDirectory(), @@ -467,7 +499,7 @@ public class TestSplitLogManager { public void testVanishingTaskZNode() throws Exception { LOG.info("testVanishingTaskZNode"); conf.setInt("hbase.splitlog.manager.unassigned.timeout", 0); - slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null); slm.finishInitialization(); FileSystem fs = TEST_UTIL.getTestFileSystem(); final Path logDir = new Path(fs.getWorkingDirectory(), diff --git src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java index 26b9865..324b7a6 100644 --- src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java @@ -19,12 +19,6 @@ */ package org.apache.hadoop.hbase.regionserver; -import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.resetCounters; -import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.tot_wkr_failed_to_grab_task_lost_race; -import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.tot_wkr_failed_to_grab_task_owned; -import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.tot_wkr_preempt_task; -import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.tot_wkr_task_acquired; -import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.tot_wkr_task_acquired_rescan; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -35,10 +29,11 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.MediumTests; -import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.SplitLogCounters; +import org.apache.hadoop.hbase.SplitLogTask; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; -import org.apache.hadoop.hbase.zookeeper.ZKSplitLog.TaskState; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.log4j.Level; @@ -53,6 +48,7 @@ import org.junit.experimental.categories.Category; @Category(MediumTests.class) public class TestSplitLogWorker { private static final Log LOG = LogFactory.getLog(TestSplitLogWorker.class); + private final ServerName MANAGER = new ServerName("manager,1,1"); static { Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG); } @@ -98,7 +94,7 @@ public class TestSplitLogWorker { ZKUtil.createAndFailSilent(zkw, zkw.splitLogZNode); assertTrue(ZKUtil.checkExists(zkw, zkw.splitLogZNode) != -1); LOG.debug(zkw.splitLogZNode + " created"); - resetCounters(); + SplitLogCounters.resetCounters(); } @@ -129,19 +125,20 @@ public class TestSplitLogWorker { @Test public void testAcquireTaskAtStartup() throws Exception { LOG.info("testAcquireTaskAtStartup"); - ZKSplitLog.Counters.resetCounters(); - - zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tatas"), - TaskState.TASK_UNASSIGNED.get("mgr"), Ids.OPEN_ACL_UNSAFE, + SplitLogCounters.resetCounters(); + final String TATAS = "tatas"; + final ServerName RS = new ServerName("rs,1,1"); + zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS), + new SplitLogTask.Unassigned(new ServerName("mgr,1,1")).toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - SplitLogWorker slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), - "rs", neverEndingTask); + SplitLogWorker slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), RS, neverEndingTask); slw.start(); try { - waitForCounter(tot_wkr_task_acquired, 0, 1, 100); - assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw, - ZKSplitLog.getEncodedNodeName(zkw, "tatas")), "rs")); + waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 100); + byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS)); + SplitLogTask slt = SplitLogTask.parseFrom(bytes); + assertTrue(slt.isOwned(RS)); } finally { stopSplitLogWorker(slw); } @@ -161,28 +158,27 @@ public class TestSplitLogWorker { @Test public void testRaceForTask() throws Exception { LOG.info("testRaceForTask"); - ZKSplitLog.Counters.resetCounters(); - - zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "trft"), - TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, + SplitLogCounters.resetCounters(); + final String TRFT = "trft"; + final ServerName SVR1 = new ServerName("svr1,1,1"); + final ServerName SVR2 = new ServerName("svr2,1,1"); + zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TRFT), + new SplitLogTask.Unassigned(MANAGER).toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - SplitLogWorker slw1 = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), - "svr1", neverEndingTask); - SplitLogWorker slw2 = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), - "svr2", neverEndingTask); + SplitLogWorker slw1 = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), SVR1, neverEndingTask); + SplitLogWorker slw2 = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), SVR2, neverEndingTask); slw1.start(); slw2.start(); try { - waitForCounter(tot_wkr_task_acquired, 0, 1, 1000); + waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 1000); // Assert that either the tot_wkr_failed_to_grab_task_owned count was set of if // not it, that we fell through to the next counter in line and it was set. - assertTrue(waitForCounterBoolean(tot_wkr_failed_to_grab_task_owned, 0, 1, 1000) || - tot_wkr_failed_to_grab_task_lost_race.get() == 1); - assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw, - ZKSplitLog.getEncodedNodeName(zkw, "trft")), "svr1") || - TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw, - ZKSplitLog.getEncodedNodeName(zkw, "trft")), "svr2")); + assertTrue(waitForCounterBoolean(SplitLogCounters.tot_wkr_failed_to_grab_task_owned, 0, 1, 1000) || + SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.get() == 1); + byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TRFT)); + SplitLogTask slt = SplitLogTask.parseFrom(bytes); + assertTrue(slt.isOwned(SVR1) || slt.isOwned(SVR2)); } finally { stopSplitLogWorker(slw1); stopSplitLogWorker(slw2); @@ -192,28 +188,28 @@ public class TestSplitLogWorker { @Test public void testPreemptTask() throws Exception { LOG.info("testPreemptTask"); - ZKSplitLog.Counters.resetCounters(); - - SplitLogWorker slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), - "tpt_svr", neverEndingTask); + SplitLogCounters.resetCounters(); + final ServerName SRV = new ServerName("tpt_svr,1,1"); + final String PATH = ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"); + SplitLogWorker slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), SRV, neverEndingTask); slw.start(); try { Thread.yield(); // let the worker start Thread.sleep(100); // this time create a task node after starting the splitLogWorker - zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"), - TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, + zkw.getRecoverableZooKeeper().create(PATH, + new SplitLogTask.Unassigned(MANAGER).toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - waitForCounter(tot_wkr_task_acquired, 0, 1, 1000); + waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 1000); assertEquals(1, slw.taskReadySeq); - assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw, - ZKSplitLog.getEncodedNodeName(zkw, "tpt_task")), "tpt_svr")); - - ZKUtil.setData(zkw, ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"), - TaskState.TASK_UNASSIGNED.get("manager")); - waitForCounter(tot_wkr_preempt_task, 0, 1, 1000); + byte [] bytes = ZKUtil.getData(zkw, PATH); + SplitLogTask slt = SplitLogTask.parseFrom(bytes); + assertTrue(slt.isOwned(SRV)); + slt = new SplitLogTask.Unassigned(MANAGER); + ZKUtil.setData(zkw, PATH, slt.toByteArray()); + waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, 1000); } finally { stopSplitLogWorker(slw); } @@ -222,35 +218,37 @@ public class TestSplitLogWorker { @Test public void testMultipleTasks() throws Exception { LOG.info("testMultipleTasks"); - ZKSplitLog.Counters.resetCounters(); - SplitLogWorker slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), - "tmt_svr", neverEndingTask); + SplitLogCounters.resetCounters(); + final ServerName SRV = new ServerName("tmt_svr,1,1"); + final String PATH1 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"); + SplitLogWorker slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), SRV, neverEndingTask); slw.start(); try { Thread.yield(); // let the worker start Thread.sleep(100); + SplitLogTask unassignedManager = new SplitLogTask.Unassigned(MANAGER); + zkw.getRecoverableZooKeeper().create(PATH1, unassignedManager.toByteArray(), + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"), - TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - - waitForCounter(tot_wkr_task_acquired, 0, 1, 1000); + waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 1000); // now the worker is busy doing the above task // create another task - zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2"), - TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); + final String PATH2 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2"); + zkw.getRecoverableZooKeeper().create(PATH2, unassignedManager.toByteArray(), + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // preempt the first task, have it owned by another worker - ZKUtil.setData(zkw, ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"), - TaskState.TASK_OWNED.get("another-worker")); - waitForCounter(tot_wkr_preempt_task, 0, 1, 1000); + final ServerName anotherWorker = new ServerName("another-worker,1,1"); + SplitLogTask slt = new SplitLogTask.Owned(anotherWorker); + ZKUtil.setData(zkw, PATH1, slt.toByteArray()); + waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, 1000); - waitForCounter(tot_wkr_task_acquired, 1, 2, 1000); + waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, 1000); assertEquals(2, slw.taskReadySeq); - assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw, - ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2")), "tmt_svr")); + byte [] bytes = ZKUtil.getData(zkw, PATH2); + slt = SplitLogTask.parseFrom(bytes); + assertTrue(slt.isOwned(SRV)); } finally { stopSplitLogWorker(slw); } @@ -259,38 +257,37 @@ public class TestSplitLogWorker { @Test public void testRescan() throws Exception { LOG.info("testRescan"); - ZKSplitLog.Counters.resetCounters(); - slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), - "svr", neverEndingTask); + SplitLogCounters.resetCounters(); + final ServerName SRV = new ServerName("svr,1,1"); + slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), SRV, neverEndingTask); slw.start(); Thread.yield(); // let the worker start Thread.sleep(100); String task = ZKSplitLog.getEncodedNodeName(zkw, "task"); - zkw.getRecoverableZooKeeper().create(task, - TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, + SplitLogTask slt = new SplitLogTask.Unassigned(MANAGER); + zkw.getRecoverableZooKeeper().create(task,slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - waitForCounter(tot_wkr_task_acquired, 0, 1, 1000); + waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 1000); // now the worker is busy doing the above task // preempt the task, have it owned by another worker - ZKUtil.setData(zkw, task, TaskState.TASK_UNASSIGNED.get("manager")); - waitForCounter(tot_wkr_preempt_task, 0, 1, 1000); + ZKUtil.setData(zkw, task, slt.toByteArray()); + waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, 1000); // create a RESCAN node String rescan = ZKSplitLog.getEncodedNodeName(zkw, "RESCAN"); - rescan = zkw.getRecoverableZooKeeper().create(rescan, - TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, + rescan = zkw.getRecoverableZooKeeper().create(rescan, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); - waitForCounter(tot_wkr_task_acquired, 1, 2, 1000); + waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, 1000); // RESCAN node might not have been processed if the worker became busy // with the above task. preempt the task again so that now the RESCAN // node is processed - ZKUtil.setData(zkw, task, TaskState.TASK_UNASSIGNED.get("manager")); - waitForCounter(tot_wkr_preempt_task, 1, 2, 1000); - waitForCounter(tot_wkr_task_acquired_rescan, 0, 1, 1000); + ZKUtil.setData(zkw, task, slt.toByteArray()); + waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 1, 2, 1000); + waitForCounter(SplitLogCounters.tot_wkr_task_acquired_rescan, 0, 1, 1000); List nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.splitLogZNode); LOG.debug(nodes); @@ -301,8 +298,8 @@ public class TestSplitLogWorker { String name = ZKSplitLog.getEncodedNodeName(zkw, node); String fn = ZKSplitLog.getFileName(name); byte [] data = ZKUtil.getData(zkw, ZKUtil.joinZNode(zkw.splitLogZNode, fn)); - String datastr = Bytes.toString(data); - assertTrue("data=" + datastr, TaskState.TASK_DONE.equals(data, "svr")); + slt = SplitLogTask.parseFrom(data); + assertTrue(slt.toString(), slt.isDone(SRV)); } } assertEquals(2, num); @@ -311,5 +308,4 @@ public class TestSplitLogWorker { @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); -} - +} \ No newline at end of file diff --git src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java index 75b5aea..65fa948 100644 --- src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.executor.EventHandler.EventType; -import org.apache.hadoop.hbase.executor.RegionTransitionData; import org.apache.hadoop.hbase.master.handler.SplitRegionHandler; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; @@ -98,9 +97,10 @@ public class TestSplitTransactionOnCluster { * @throws InterruptedException * @throws NodeExistsException * @throws KeeperException + * @throws DeserializationException */ @Test (timeout = 300000) public void testRSSplitEphemeralsDisappearButDaughtersAreOnlinedAfterShutdownHandling() - throws IOException, InterruptedException, NodeExistsException, KeeperException { + throws IOException, InterruptedException, NodeExistsException, KeeperException, DeserializationException { final byte [] tableName = Bytes.toBytes("ephemeral"); @@ -137,12 +137,12 @@ public class TestSplitTransactionOnCluster { Stat stats = TESTING_UTIL.getZooKeeperWatcher().getRecoverableZooKeeper().exists(path, false); LOG.info("EPHEMERAL NODE BEFORE SERVER ABORT, path=" + path + ", stats=" + stats); - RegionTransitionData rtd = - ZKAssign.getData(TESTING_UTIL.getZooKeeperWatcher(), - hri.getEncodedName()); + RegionTransition rt = + RegionTransition.parseFrom(ZKAssign.getData(TESTING_UTIL.getZooKeeperWatcher(), + hri.getEncodedName())); // State could be SPLIT or SPLITTING. - assertTrue(rtd.getEventType().equals(EventType.RS_ZK_REGION_SPLIT) || - rtd.getEventType().equals(EventType.RS_ZK_REGION_SPLITTING)); + assertTrue(rt.getEventType().equals(EventType.RS_ZK_REGION_SPLIT) || + rt.getEventType().equals(EventType.RS_ZK_REGION_SPLITTING)); // Now crash the server cluster.abortRegionServer(tableRegionIndex); waitUntilRegionServerDead(); diff --git src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestCloseRegionHandler.java src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestCloseRegionHandler.java index 07f8fc4..567ef10 100644 --- src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestCloseRegionHandler.java +++ src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestCloseRegionHandler.java @@ -25,14 +25,15 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.DeserializationException; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.RegionTransition; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.executor.EventHandler.EventType; -import org.apache.hadoop.hbase.executor.RegionTransitionData; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.util.Bytes; @@ -133,9 +134,10 @@ public class TestCloseRegionHandler { * @throws IOException * @throws NodeExistsException * @throws KeeperException + * @throws DeserializationException */ @Test public void testZKClosingNodeVersionMismatch() - throws IOException, NodeExistsException, KeeperException { + throws IOException, NodeExistsException, KeeperException, DeserializationException { final Server server = new MockServer(HTU); final MockRegionServerServices rss = new MockRegionServerServices(); rss.setFileSystem(HTU.getTestFileSystem()); @@ -160,9 +162,9 @@ public class TestCloseRegionHandler { handler.process(); // Handler should remain in M_ZK_REGION_CLOSING - RegionTransitionData data = - ZKAssign.getData(server.getZooKeeper(), hri.getEncodedName()); - assertTrue(EventType.M_ZK_REGION_CLOSING == data.getEventType()); + RegionTransition rt = + RegionTransition.parseFrom(ZKAssign.getData(server.getZooKeeper(), hri.getEncodedName())); + assertTrue(rt.getEventType().equals(EventType.M_ZK_REGION_CLOSING )); } /** @@ -170,9 +172,10 @@ public class TestCloseRegionHandler { * @throws IOException * @throws NodeExistsException * @throws KeeperException + * @throws DeserializationException */ @Test public void testCloseRegion() - throws IOException, NodeExistsException, KeeperException { + throws IOException, NodeExistsException, KeeperException, DeserializationException { final Server server = new MockServer(HTU); final MockRegionServerServices rss = new MockRegionServerServices(); rss.setFileSystem(HTU.getTestFileSystem()); @@ -196,26 +199,23 @@ public class TestCloseRegionHandler { versionOfClosingNode); handler.process(); // Handler should have transitioned it to RS_ZK_REGION_CLOSED - RegionTransitionData data = - ZKAssign.getData(server.getZooKeeper(), hri.getEncodedName()); - assertTrue(EventType.RS_ZK_REGION_CLOSED == data.getEventType()); + RegionTransition rt = RegionTransition.parseFrom( + ZKAssign.getData(server.getZooKeeper(), hri.getEncodedName())); + assertTrue(rt.getEventType().equals(EventType.RS_ZK_REGION_CLOSED)); } + private void OpenRegion(Server server, RegionServerServices rss, - HTableDescriptor htd, HRegionInfo hri) - throws IOException, NodeExistsException, KeeperException { - // Create it OFFLINE node, which is what Master set before sending OPEN RPC - ZKAssign.createNodeOffline(server.getZooKeeper(), hri, - server.getServerName()); - OpenRegionHandler openHandler = new OpenRegionHandler(server, rss, hri, - htd); - openHandler.process(); - RegionTransitionData data = - ZKAssign.getData(server.getZooKeeper(), hri.getEncodedName()); - - // delete the node, which is what Master do after the region is opened - ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(), - EventType.RS_ZK_REGION_OPENED); - } + HTableDescriptor htd, HRegionInfo hri) + throws IOException, NodeExistsException, KeeperException, DeserializationException { + // Create it OFFLINE node, which is what Master set before sending OPEN RPC + ZKAssign.createNodeOffline(server.getZooKeeper(), hri, server.getServerName()); + OpenRegionHandler openHandler = new OpenRegionHandler(server, rss, hri, htd); + openHandler.process(); + // This parse is not used? + RegionTransition.parseFrom(ZKAssign.getData(server.getZooKeeper(), hri.getEncodedName())); + // delete the node, which is what Master do after the region is opened + ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(), EventType.RS_ZK_REGION_OPENED); + } @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = diff --git src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java index 55a8c4a..0e7cfee 100644 --- src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java +++ src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java @@ -26,7 +26,6 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.executor.RegionTransitionData; import org.apache.hadoop.hbase.executor.EventHandler.EventType; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionServerServices; @@ -148,9 +147,9 @@ public class TestOpenRegionHandler { handler.process(); // Handler should have transitioned it to FAILED_OPEN - RegionTransitionData data = - ZKAssign.getData(server.getZooKeeper(), TEST_HRI.getEncodedName()); - assertEquals(EventType.RS_ZK_REGION_FAILED_OPEN, data.getEventType()); + RegionTransition rt = RegionTransition.parseFrom( + ZKAssign.getData(server.getZooKeeper(), TEST_HRI.getEncodedName())); + assertEquals(EventType.RS_ZK_REGION_FAILED_OPEN, rt.getEventType()); } @Test @@ -173,9 +172,9 @@ public class TestOpenRegionHandler { handler.process(); // Handler should have transitioned it to FAILED_OPEN - RegionTransitionData data = - ZKAssign.getData(server.getZooKeeper(), TEST_HRI.getEncodedName()); - assertEquals(EventType.RS_ZK_REGION_FAILED_OPEN, data.getEventType()); + RegionTransition rt = RegionTransition.parseFrom( + ZKAssign.getData(server.getZooKeeper(), TEST_HRI.getEncodedName())); + assertEquals(EventType.RS_ZK_REGION_FAILED_OPEN, rt.getEventType()); } diff --git src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java index 4314572..d081968 100644 --- src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java +++ src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.RegionTransition; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.AdminProtocol; import org.apache.hadoop.hbase.client.Delete; @@ -59,7 +60,6 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.executor.EventHandler.EventType; -import org.apache.hadoop.hbase.executor.RegionTransitionData; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; @@ -920,9 +920,9 @@ public class TestHBaseFsck { int iTimes = 0; while (true) { - RegionTransitionData rtd = ZKAssign.getData(zkw, - region.getEncodedName()); - if (rtd != null && rtd.getEventType() == EventType.RS_ZK_REGION_OPENED) { + RegionTransition rt = RegionTransition.parseFrom(ZKAssign.getData(zkw, + region.getEncodedName())); + if (rt != null && rt.getEventType() == EventType.RS_ZK_REGION_OPENED) { break; } Thread.sleep(100);