diff --git src/main/java/org/apache/hadoop/hbase/DeserializationException.java src/main/java/org/apache/hadoop/hbase/DeserializationException.java
new file mode 100644
index 0000000..b85edb3
--- /dev/null
+++ src/main/java/org/apache/hadoop/hbase/DeserializationException.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Failed deserialization.
+ */
+@InterfaceAudience.Private
+@SuppressWarnings("serial")
+public class DeserializationException extends HBaseException {
+ public DeserializationException() {
+ super();
+ }
+
+ public DeserializationException(final String message) {
+ super(message);
+ }
+
+ public DeserializationException(final String message, final Throwable t) {
+ super(message, t);
+ }
+
+ public DeserializationException(final Throwable t) {
+ super(t);
+ }
+}
\ No newline at end of file
diff --git src/main/java/org/apache/hadoop/hbase/EmptyWatcher.java src/main/java/org/apache/hadoop/hbase/EmptyWatcher.java
deleted file mode 100644
index 9881ec2..0000000
--- src/main/java/org/apache/hadoop/hbase/EmptyWatcher.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Copyright 2009 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.WatchedEvent;
-
-/**
- * An empty ZooKeeper watcher
- */
-@InterfaceAudience.Private
-public class EmptyWatcher implements Watcher {
- public static EmptyWatcher instance = new EmptyWatcher();
- private EmptyWatcher() {}
-
- public void process(WatchedEvent event) {}
-}
diff --git src/main/java/org/apache/hadoop/hbase/HBaseException.java src/main/java/org/apache/hadoop/hbase/HBaseException.java
new file mode 100644
index 0000000..1b5ac3a
--- /dev/null
+++ src/main/java/org/apache/hadoop/hbase/HBaseException.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Base checked exception in HBase.
+ * @see https://issues.apache.org/jira/browse/HBASE-5796
+ */
+@SuppressWarnings("serial")
+@InterfaceAudience.Private
+public class HBaseException extends Exception {
+ public HBaseException() {
+ super();
+ }
+
+ public HBaseException(final String message) {
+ super(message);
+ }
+
+ public HBaseException(final String message, final Throwable t) {
+ super(message, t);
+ }
+
+ public HBaseException(final Throwable t) {
+ super(t);
+ }
+}
\ No newline at end of file
diff --git src/main/java/org/apache/hadoop/hbase/RegionTransition.java src/main/java/org/apache/hadoop/hbase/RegionTransition.java
new file mode 100644
index 0000000..582575d
--- /dev/null
+++ src/main/java/org/apache/hadoop/hbase/RegionTransition.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.executor.EventHandler;
+import org.apache.hadoop.hbase.executor.EventHandler.EventType;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * Current state of a region in transition. Holds state of a region as it moves through the
+ * steps that take it from offline to open, etc. Used by regionserver, master, and zk packages.
+ * Encapsulates protobuf serialization/deserialization so we don't leak generated pb outside this
+ * class. Create an instance using {@link #createRegionTransition(EventType, byte[], ServerName)}.
+ *
Immutable
+ */
+@InterfaceAudience.Private
+public class RegionTransition {
+ private final ZooKeeperProtos.RegionTransition rt;
+
+ /**
+ * Shutdown constructor
+ */
+ private RegionTransition() {
+ this(null);
+ }
+
+ private RegionTransition(final ZooKeeperProtos.RegionTransition rt) {
+ this.rt = rt;
+ }
+
+ public EventHandler.EventType getEventType() {
+ return EventHandler.EventType.get(this.rt.getEventTypeCode());
+ }
+
+ public ServerName getServerName() {
+ return ProtobufUtil.toServerName(this.rt.getOriginServerName());
+ }
+
+ public long getCreateTime() {
+ return this.rt.getCreateTime();
+ }
+
+ /**
+ * @return Full region name
+ */
+ public byte [] getRegionName() {
+ return this.rt.getRegionName().toByteArray();
+ }
+
+ public byte [] getPayload() {
+ return this.rt.getPayload().toByteArray();
+ }
+
+ @Override
+ public String toString() {
+ byte [] payload = getPayload();
+ return "region=" + Bytes.toStringBinary(getRegionName()) + ", state=" + getEventType() +
+ ", servername=" + getServerName() + ", createTime=" + this.getCreateTime() +
+ ", payload.length=" + (payload == null? 0: payload.length);
+ }
+
+ /**
+ * @param type
+ * @param regionName
+ * @param sn
+ * @return a serialized pb {@link RegionTransition}
+ * @see #parseRegionTransition(byte[])
+ */
+ public static RegionTransition createRegionTransition(final EventType type,
+ final byte [] regionName, final ServerName sn) {
+ return createRegionTransition(type, regionName, sn, null);
+ }
+
+ /**
+ * @param type
+ * @param regionName
+ * @param sn
+ * @param payload May be null
+ * @return a serialized pb {@link RegionTransition}
+ * @see #parseRegionTransition(byte[])
+ */
+ public static RegionTransition createRegionTransition(final EventType type,
+ final byte [] regionName, final ServerName sn, final byte [] payload) {
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName pbsn =
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.newBuilder().
+ setHostName(sn.getHostname()).setPort(sn.getPort()).setStartCode(sn.getStartcode()).build();
+ ZooKeeperProtos.RegionTransition.Builder builder = ZooKeeperProtos.RegionTransition.newBuilder().
+ setEventTypeCode(type.getCode()).setRegionName(ByteString.copyFrom(regionName)).
+ setOriginServerName(pbsn);
+ if (payload != null) builder.setPayload(ByteString.copyFrom(payload));
+ return new RegionTransition(builder.build());
+ }
+
+ /**
+ * @param data Serialized date to parse.
+ * @return A RegionTransition instance made of the passed data
+ * @throws DeserializationException
+ * @see #toByteArray()
+ */
+ public static RegionTransition parseFrom(final byte [] data) throws DeserializationException {
+ ProtobufUtil.expectPBMagicPrefix(data);
+ try {
+ int prefixLen = ProtobufUtil.lengthOfPBMagic();
+ ZooKeeperProtos.RegionTransition rt = ZooKeeperProtos.RegionTransition.newBuilder().
+ mergeFrom(data, prefixLen, data.length - prefixLen).build();
+ return new RegionTransition(rt);
+ } catch (InvalidProtocolBufferException e) {
+ throw new DeserializationException(e);
+ }
+ }
+
+ /**
+ * @return This instance serialized into a byte array
+ * @see #parseFrom(byte[])
+ */
+ public byte [] toByteArray() {
+ return ProtobufUtil.prependPBMagic(this.rt.toByteArray());
+ }
+}
\ No newline at end of file
diff --git src/main/java/org/apache/hadoop/hbase/ServerName.java src/main/java/org/apache/hadoop/hbase/ServerName.java
index 8fdb624..1a40153 100644
--- src/main/java/org/apache/hadoop/hbase/ServerName.java
+++ src/main/java/org/apache/hadoop/hbase/ServerName.java
@@ -24,9 +24,13 @@ import java.util.regex.Pattern;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RootRegionServer;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
+import com.google.protobuf.InvalidProtocolBufferException;
+
/**
* Instance of an HBase ServerName.
* A server name is used uniquely identifying a server instance and is made
@@ -296,4 +300,47 @@ public class ServerName implements Comparable {
return SERVERNAME_PATTERN.matcher(str).matches()? new ServerName(str):
new ServerName(str, NON_STARTCODE);
}
-}
+
+ /**
+ * Get a ServerName from the passed in data bytes.
+ * @param data Data with a serialize server name in it; can handle the old style
+ * servername where servername was host and port. Works too with data that
+ * begins w/ the pb 'PBUF' magic and that is then followed by a protobuf that
+ * has a serialized {@link ServerName} in it.
+ * @return Returns null if data is null else converts passed data
+ * to a ServerName instance.
+ * @throws DeserializationException
+ */
+ public static ServerName parseFrom(final byte [] data) throws DeserializationException {
+ if (data == null || data.length <= 0) return null;
+ if (ProtobufUtil.isPBMagicPrefix(data)) {
+ int prefixLen = ProtobufUtil.lengthOfPBMagic();
+ try {
+ RootRegionServer rss =
+ RootRegionServer.newBuilder().mergeFrom(data, prefixLen, data.length - prefixLen).build();
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName sn = rss.getServer();
+ return new ServerName(sn.getHostName(), sn.getPort(), sn.getStartCode());
+ } catch (InvalidProtocolBufferException e) {
+ // A failed parse of the znode is pretty catastrophic. Rather than loop
+ // retrying hoping the bad bytes will changes, and rather than change
+ // the signature on this method to add an IOE which will send ripples all
+ // over the code base, throw a RuntimeException. This should "never" happen.
+ // Fail fast if it does.
+ throw new DeserializationException(e);
+ }
+ }
+ // The str returned could be old style -- pre hbase-1502 -- which was
+ // hostname and port seperated by a colon rather than hostname, port and
+ // startcode delimited by a ','.
+ String str = Bytes.toString(data);
+ int index = str.indexOf(ServerName.SERVERNAME_SEPARATOR);
+ if (index != -1) {
+ // Presume its ServerName serialized with versioned bytes.
+ return ServerName.parseVersionedServerName(data);
+ }
+ // Presume it a hostname:port format.
+ String hostname = Addressing.parseHostname(str);
+ int port = Addressing.parsePort(str);
+ return new ServerName(hostname, port, -1L);
+ }
+}
\ No newline at end of file
diff --git src/main/java/org/apache/hadoop/hbase/SplitLogCounters.java src/main/java/org/apache/hadoop/hbase/SplitLogCounters.java
new file mode 100644
index 0000000..45e385c
--- /dev/null
+++ src/main/java/org/apache/hadoop/hbase/SplitLogCounters.java
@@ -0,0 +1,102 @@
+package org.apache.hadoop.hbase;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.lang.reflect.Field;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Counters kept by the distributed WAL split log process.
+ * Used by master and regionserver packages.
+ */
+@InterfaceAudience.Private
+public class SplitLogCounters {
+ //SplitLogManager counters
+ public static AtomicLong tot_mgr_log_split_batch_start = new AtomicLong(0);
+ public static AtomicLong tot_mgr_log_split_batch_success =
+ new AtomicLong(0);
+ public static AtomicLong tot_mgr_log_split_batch_err = new AtomicLong(0);
+ public static AtomicLong tot_mgr_new_unexpected_hlogs = new AtomicLong(0);
+ public static AtomicLong tot_mgr_log_split_start = new AtomicLong(0);
+ public static AtomicLong tot_mgr_log_split_success = new AtomicLong(0);
+ public static AtomicLong tot_mgr_log_split_err = new AtomicLong(0);
+ public static AtomicLong tot_mgr_node_create_queued = new AtomicLong(0);
+ public static AtomicLong tot_mgr_node_create_result = new AtomicLong(0);
+ public static AtomicLong tot_mgr_node_already_exists = new AtomicLong(0);
+ public static AtomicLong tot_mgr_node_create_err = new AtomicLong(0);
+ public static AtomicLong tot_mgr_node_create_retry = new AtomicLong(0);
+ public static AtomicLong tot_mgr_get_data_queued = new AtomicLong(0);
+ public static AtomicLong tot_mgr_get_data_result = new AtomicLong(0);
+ public static AtomicLong tot_mgr_get_data_nonode = new AtomicLong(0);
+ public static AtomicLong tot_mgr_get_data_err = new AtomicLong(0);
+ public static AtomicLong tot_mgr_get_data_retry = new AtomicLong(0);
+ public static AtomicLong tot_mgr_node_delete_queued = new AtomicLong(0);
+ public static AtomicLong tot_mgr_node_delete_result = new AtomicLong(0);
+ public static AtomicLong tot_mgr_node_delete_err = new AtomicLong(0);
+ public static AtomicLong tot_mgr_resubmit = new AtomicLong(0);
+ public static AtomicLong tot_mgr_resubmit_failed = new AtomicLong(0);
+ public static AtomicLong tot_mgr_null_data = new AtomicLong(0);
+ public static AtomicLong tot_mgr_orphan_task_acquired = new AtomicLong(0);
+ public static AtomicLong tot_mgr_wait_for_zk_delete = new AtomicLong(0);
+ public static AtomicLong tot_mgr_unacquired_orphan_done = new AtomicLong(0);
+ public static AtomicLong tot_mgr_resubmit_threshold_reached =
+ new AtomicLong(0);
+ public static AtomicLong tot_mgr_missing_state_in_delete =
+ new AtomicLong(0);
+ public static AtomicLong tot_mgr_heartbeat = new AtomicLong(0);
+ public static AtomicLong tot_mgr_rescan = new AtomicLong(0);
+ public static AtomicLong tot_mgr_rescan_deleted = new AtomicLong(0);
+ public static AtomicLong tot_mgr_task_deleted = new AtomicLong(0);
+ public static AtomicLong tot_mgr_resubmit_unassigned = new AtomicLong(0);
+ public static AtomicLong tot_mgr_relist_logdir = new AtomicLong(0);
+ public static AtomicLong tot_mgr_resubmit_dead_server_task =
+ new AtomicLong(0);
+
+ // SplitLogWorker counters
+ public static AtomicLong tot_wkr_failed_to_grab_task_no_data =
+ new AtomicLong(0);
+ public static AtomicLong tot_wkr_failed_to_grab_task_exception =
+ new AtomicLong(0);
+ public static AtomicLong tot_wkr_failed_to_grab_task_owned =
+ new AtomicLong(0);
+ public static AtomicLong tot_wkr_failed_to_grab_task_lost_race =
+ new AtomicLong(0);
+ public static AtomicLong tot_wkr_task_acquired = new AtomicLong(0);
+ public static AtomicLong tot_wkr_task_resigned = new AtomicLong(0);
+ public static AtomicLong tot_wkr_task_done = new AtomicLong(0);
+ public static AtomicLong tot_wkr_task_err = new AtomicLong(0);
+ public static AtomicLong tot_wkr_task_heartbeat = new AtomicLong(0);
+ public static AtomicLong tot_wkr_task_acquired_rescan = new AtomicLong(0);
+ public static AtomicLong tot_wkr_get_data_queued = new AtomicLong(0);
+ public static AtomicLong tot_wkr_get_data_result = new AtomicLong(0);
+ public static AtomicLong tot_wkr_get_data_retry = new AtomicLong(0);
+ public static AtomicLong tot_wkr_preempt_task = new AtomicLong(0);
+ public static AtomicLong tot_wkr_task_heartbeat_failed = new AtomicLong(0);
+ public static AtomicLong tot_wkr_final_transistion_failed =
+ new AtomicLong(0);
+
+ public static void resetCounters() throws Exception {
+ Class> cl = (new SplitLogCounters()).getClass();
+ Field[] flds = cl.getDeclaredFields();
+ for (Field fld : flds) {
+ ((AtomicLong)fld.get(null)).set(0);
+ }
+ }
+}
\ No newline at end of file
diff --git src/main/java/org/apache/hadoop/hbase/SplitLogTask.java src/main/java/org/apache/hadoop/hbase/SplitLogTask.java
new file mode 100644
index 0000000..1f16e35
--- /dev/null
+++ src/main/java/org/apache/hadoop/hbase/SplitLogTask.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * State of a WAL log split during distributed splitting. State is kept up in zookeeper.
+ * Encapsulates protobuf serialization/deserialization so we don't leak generated pb outside of
+ * this class. Used by regionserver and master packages.
+ * Immutable
+ */
+@InterfaceAudience.Private
+public class SplitLogTask {
+ private final ServerName originServer;
+ private final ZooKeeperProtos.SplitLogTask.State state;
+
+ public static class Unassigned extends SplitLogTask {
+ public Unassigned(final ServerName originServer) {
+ super(originServer, ZooKeeperProtos.SplitLogTask.State.UNASSIGNED);
+ }
+ }
+
+ public static class Owned extends SplitLogTask {
+ public Owned(final ServerName originServer) {
+ super(originServer, ZooKeeperProtos.SplitLogTask.State.OWNED);
+ }
+ }
+
+ public static class Resigned extends SplitLogTask {
+ public Resigned(final ServerName originServer) {
+ super(originServer, ZooKeeperProtos.SplitLogTask.State.RESIGNED);
+ }
+ }
+
+ public static class Done extends SplitLogTask {
+ public Done(final ServerName originServer) {
+ super(originServer, ZooKeeperProtos.SplitLogTask.State.DONE);
+ }
+ }
+
+ public static class Err extends SplitLogTask {
+ public Err(final ServerName originServer) {
+ super(originServer, ZooKeeperProtos.SplitLogTask.State.ERR);
+ }
+ }
+
+ SplitLogTask(final ServerName originServer, final ZooKeeperProtos.SplitLogTask.State state) {
+ this.originServer = originServer;
+ this.state = state;
+ }
+
+ public boolean isUnassigned(final ServerName sn) {
+ return this.originServer.equals(sn) && isUnassigned();
+ }
+
+ public boolean isUnassigned() {
+ return this.state == ZooKeeperProtos.SplitLogTask.State.UNASSIGNED;
+ }
+
+ public boolean isOwned(final ServerName sn) {
+ return this.originServer.equals(sn) && isOwned();
+ }
+
+ public boolean isOwned() {
+ return this.state == ZooKeeperProtos.SplitLogTask.State.OWNED;
+ }
+
+ public boolean isResigned(final ServerName sn) {
+ return this.originServer.equals(sn) && isResigned();
+ }
+
+ public boolean isResigned() {
+ return this.state == ZooKeeperProtos.SplitLogTask.State.RESIGNED;
+ }
+
+ public boolean isDone(final ServerName sn) {
+ return this.originServer.equals(sn) && isDone();
+ }
+
+ public boolean isDone() {
+ return this.state == ZooKeeperProtos.SplitLogTask.State.DONE;
+ }
+
+ public boolean isErr(final ServerName sn) {
+ return this.originServer.equals(sn) && isErr();
+ }
+
+ public boolean isErr() {
+ return this.state == ZooKeeperProtos.SplitLogTask.State.ERR;
+ }
+
+ @Override
+ public String toString() {
+ return this.state.toString() + " " + this.originServer.toString();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof SplitLogTask)) return false;
+ SplitLogTask other = (SplitLogTask)obj;
+ return other.state.equals(this.state) && other.originServer.equals(this.originServer);
+ }
+
+ @Override
+ public int hashCode() {
+ int hash = 7;
+ hash = 31 * hash + this.state.hashCode();
+ return 31 * hash + this.originServer.hashCode();
+ }
+
+ /**
+ * @param data Serialized date to parse.
+ * @return An SplitLogTaskState instance made of the passed data
+ * @throws DeserializationException
+ * @see #toByteArray()
+ */
+ public static SplitLogTask parseFrom(final byte [] data) throws DeserializationException {
+ ProtobufUtil.expectPBMagicPrefix(data);
+ try {
+ ZooKeeperProtos.SplitLogTask slts = ZooKeeperProtos.SplitLogTask.parseFrom(data);
+ return new SplitLogTask(ProtobufUtil.toServerName(slts.getServerName()),
+ slts.getState());
+ } catch (InvalidProtocolBufferException e) {
+ throw new DeserializationException(e);
+ }
+ }
+
+ /**
+ * @return This instance serialized into a byte array
+ * @see #parseFrom(byte[])
+ */
+ public byte [] toByteArray() {
+ // First create a pb ServerName. Then create a ByteString w/ the TaskState
+ // bytes in it. Finally create a SplitLogTaskState passing in the two
+ // pbs just created.
+ HBaseProtos.ServerName snpb = ProtobufUtil.toServerName(this.originServer);
+ ZooKeeperProtos.SplitLogTask slts =
+ ZooKeeperProtos.SplitLogTask.newBuilder().setServerName(snpb).setState(this.state).build();
+ return ProtobufUtil.prependPBMagic(slts.toByteArray());
+ }
+}
\ No newline at end of file
diff --git src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java
index 4121508..93d2fbb 100644
--- src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java
+++ src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java
@@ -105,7 +105,7 @@ public abstract class EventHandler implements Runnable, Comparable {
public enum EventType {
// Messages originating from RS (NOTE: there is NO direct communication from
// RS to Master). These are a result of RS updates into ZK.
- //RS_ZK_REGION_CLOSING (1), // It is replaced by M_ZK_REGION_CLOSING(HBASE-4739)
+ // RS_ZK_REGION_CLOSING (1), // It is replaced by M_ZK_REGION_CLOSING(HBASE-4739)
RS_ZK_REGION_CLOSED (2), // RS has finished closing a region
RS_ZK_REGION_OPENING (3), // RS is in process of opening a region
RS_ZK_REGION_OPENED (4), // RS has finished opening a region
@@ -140,10 +140,27 @@ public abstract class EventHandler implements Runnable, Comparable {
M_SERVER_SHUTDOWN (70), // Master is processing shutdown of a RS
M_META_SERVER_SHUTDOWN (72); // Master is processing shutdown of RS hosting a meta region (-ROOT- or .META.).
+ private final int code;
+
/**
* Constructor
*/
- EventType(int value) {}
+ EventType(final int code) {
+ this.code = code;
+ }
+
+ public int getCode() {
+ return this.code;
+ }
+
+ public static EventType get(final int code) {
+ // Is this going to be slow? Its used rare but still...
+ for (EventType et: EventType.values()) {
+ if (et.getCode() == code) return et;
+ }
+ throw new IllegalArgumentException("Unknown code " + code);
+ }
+
public boolean isOnlineSchemaChangeSupported() {
return (
this.equals(EventType.C_M_ADD_FAMILY) ||
diff --git src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
index 06ca377..d898f38 100644
--- src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
+++ src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
@@ -20,7 +20,6 @@
package org.apache.hadoop.hbase.executor;
import java.io.IOException;
-import java.io.PrintWriter;
import java.io.Writer;
import java.lang.management.ThreadInfo;
import java.util.List;
@@ -30,8 +29,6 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -40,7 +37,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.executor.EventHandler.EventHandlerListener;
-import org.apache.hadoop.hbase.executor.EventHandler.EventType;
import org.apache.hadoop.hbase.monitoring.ThreadMonitoring;
import com.google.common.collect.Lists;
diff --git src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionData.java src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionData.java
deleted file mode 100644
index 35d7b70..0000000
--- src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionData.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/**
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.executor;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.executor.EventHandler.EventType;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Writables;
-import org.apache.hadoop.io.Writable;
-
-/**
- * Data serialized into ZooKeeper for region transitions.
- */
-@InterfaceAudience.Private
-public class RegionTransitionData implements Writable {
- /**
- * Type of transition event (offline, opening, opened, closing, closed).
- * Required.
- */
- private EventType eventType;
-
- /** Region being transitioned. Required. */
- private byte [] regionName;
-
- /** Server event originated from. Optional. */
- private ServerName origin;
-
- /** Time the event was created. Required but automatically set. */
- private long stamp;
-
- private byte [] payload;
-
- /**
- * Writable constructor. Do not use directly.
- */
- public RegionTransitionData() {}
-
- /**
- * Construct data for a new region transition event with the specified event
- * type and region name.
- *
- * Used when the server name is not known (the master is setting it). This
- * happens during cluster startup or during failure scenarios. When
- * processing a failed regionserver, the master assigns the regions from that
- * server to other servers though the region was never 'closed'. During
- * master failover, the new master may have regions stuck in transition
- * without a destination so may have to set regions offline and generate a new
- * assignment.
- *
- *
Since only the master uses this constructor, the type should always be
- * {@link EventType#M_ZK_REGION_OFFLINE}.
- *
- * @param eventType type of event
- * @param regionName name of region as per HRegionInfo#getRegionName()
- */
- public RegionTransitionData(EventType eventType, byte [] regionName) {
- this(eventType, regionName, null);
- }
-
- /**
- * Construct data for a new region transition event with the specified event
- * type, region name, and server name.
- *
- *
Used when the server name is known (a regionserver is setting it).
- *
- *
Valid types for this constructor are {@link EventType#M_ZK_REGION_CLOSING},
- * {@link EventType#RS_ZK_REGION_CLOSED}, {@link EventType#RS_ZK_REGION_OPENING},
- * {@link EventType#RS_ZK_REGION_SPLITTING},
- * and {@link EventType#RS_ZK_REGION_OPENED}.
- *
- * @param eventType type of event
- * @param regionName name of region as per HRegionInfo#getRegionName()
- * @param origin Originating {@link ServerName}
- */
- public RegionTransitionData(EventType eventType, byte [] regionName,
- final ServerName origin) {
- this(eventType, regionName, origin, null);
- }
-
- /**
- * Construct data for a new region transition event with the specified event
- * type, region name, and server name.
- *
- *
Used when the server name is known (a regionserver is setting it).
- *
- *
Valid types for this constructor are {@link EventType#RS_ZK_REGION_SPLIT}
- * since SPLIT is only type that currently carries a payload.
- *
- * @param eventType type of event
- * @param regionName name of region as per HRegionInfo#getRegionName()
- * @param serverName Originating {@link ServerName}
- * @param payload Payload examples include the daughters involved in a
- * {@link EventType#RS_ZK_REGION_SPLIT}. Can be null
- */
- public RegionTransitionData(EventType eventType, byte [] regionName,
- final ServerName serverName, final byte [] payload) {
- this.eventType = eventType;
- this.stamp = System.currentTimeMillis();
- this.regionName = regionName;
- this.origin = serverName;
- this.payload = payload;
- }
-
- /**
- * Gets the type of region transition event.
- *
- *
One of:
- *
- * - {@link EventType#M_ZK_REGION_OFFLINE}
- *
- {@link EventType#M_ZK_REGION_CLOSING}
- *
- {@link EventType#RS_ZK_REGION_CLOSED}
- *
- {@link EventType#RS_ZK_REGION_OPENING}
- *
- {@link EventType#RS_ZK_REGION_OPENED}
- *
- {@link EventType#RS_ZK_REGION_SPLITTING}
- *
- {@link EventType#RS_ZK_REGION_SPLIT}
- *
- * @return type of region transition event
- */
- public EventType getEventType() {
- return eventType;
- }
-
- /**
- * Gets the name of the region being transitioned.
- *
- * Region name is required so this never returns null.
- * @return region name, the result of a call to HRegionInfo#getRegionName()
- */
- public byte [] getRegionName() {
- return regionName;
- }
-
- /**
- * Gets the server the event originated from. If null, this event originated
- * from the master.
- *
- * @return server name of originating regionserver, or null if from master
- */
- public ServerName getOrigin() {
- return origin;
- }
-
- /**
- * Gets the timestamp when this event was created.
- *
- * @return stamp event was created
- */
- public long getStamp() {
- return stamp;
- }
-
- /**
- * @return Payload if any.
- */
- public byte [] getPayload() {
- return this.payload;
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- // the event type byte
- eventType = EventType.values()[in.readShort()];
- // the timestamp
- stamp = in.readLong();
- // the encoded name of the region being transitioned
- regionName = Bytes.readByteArray(in);
- // remaining fields are optional so prefixed with boolean
- // the name of the regionserver sending the data
- if (in.readBoolean()) {
- byte [] versionedBytes = Bytes.readByteArray(in);
- this.origin = ServerName.parseVersionedServerName(versionedBytes);
- }
- if (in.readBoolean()) {
- this.payload = Bytes.readByteArray(in);
- }
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeShort(eventType.ordinal());
- out.writeLong(System.currentTimeMillis());
- Bytes.writeByteArray(out, regionName);
- // remaining fields are optional so prefixed with boolean
- out.writeBoolean(this.origin != null);
- if (this.origin != null) {
- Bytes.writeByteArray(out, this.origin.getVersionedBytes());
- }
- out.writeBoolean(this.payload != null);
- if (this.payload != null) {
- Bytes.writeByteArray(out, this.payload);
- }
- }
-
- /**
- * Get the bytes for this instance. Throws a {@link RuntimeException} if
- * there is an error deserializing this instance because it represents a code
- * bug.
- * @return binary representation of this instance
- */
- public byte [] getBytes() {
- try {
- return Writables.getBytes(this);
- } catch(IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Get an instance from bytes. Throws a {@link RuntimeException} if
- * there is an error serializing this instance from bytes because it
- * represents a code bug.
- * @param bytes binary representation of this instance
- * @return instance of this class
- */
- public static RegionTransitionData fromBytes(byte [] bytes) {
- try {
- RegionTransitionData data = new RegionTransitionData();
- Writables.getWritable(bytes, data);
- return data;
- } catch(IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public String toString() {
- return "region=" + Bytes.toStringBinary(regionName) + ", origin=" + this.origin +
- ", state=" + eventType;
- }
-}
diff --git src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java
index 47e3bd6..3441e46 100644
--- src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java
+++ src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.DeserializationException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
@@ -138,7 +139,8 @@ class ActiveMasterManager extends ZooKeeperListener {
// Try to become the active master, watch if there is another master.
// Write out our ServerName as versioned bytes.
try {
- String backupZNode = ZKUtil.joinZNode(this.watcher.backupMasterAddressesZNode, this.sn.toString());
+ String backupZNode =
+ ZKUtil.joinZNode(this.watcher.backupMasterAddressesZNode, this.sn.toString());
if (MasterAddressTracker.setMasterAddress(this.watcher, this.watcher.getMasterAddressZNode(), this.sn)) {
// If we were a backup master before, delete our ZNode from the backup
// master directory since we are the active now
@@ -174,7 +176,14 @@ class ActiveMasterManager extends ZooKeeperListener {
msg = ("A master was detected, but went down before its address " +
"could be read. Attempting to become the next active master");
} else {
- ServerName currentMaster = ZKUtil.znodeContentToServerName(bytes);
+ ServerName currentMaster;
+ try {
+ currentMaster = ServerName.parseFrom(bytes);
+ } catch (DeserializationException e) {
+ LOG.warn("Failed parse", e);
+ // Hopefully next time around we won't fail the parse. Dangerous.
+ continue;
+ }
if (ServerName.isSameHostnameAndPort(currentMaster, this.sn)) {
msg = ("Current master has this master's address, " +
currentMaster + "; master was restarted? Deleting node.");
diff --git src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
index f56127d..e3f4ee4 100644
--- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
+++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
@@ -48,10 +48,12 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Chore;
+import org.apache.hadoop.hbase.DeserializationException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerLoad;
import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.RegionTransition;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
@@ -62,7 +64,6 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
import org.apache.hadoop.hbase.executor.ExecutorService;
-import org.apache.hadoop.hbase.executor.RegionTransitionData;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.master.AssignmentManager.RegionState.State;
import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
@@ -489,29 +490,34 @@ public class AssignmentManager extends ZooKeeperListener {
* @throws IOException
*/
boolean processRegionInTransition(final String encodedRegionName,
- final HRegionInfo regionInfo,
- final Map>> deadServers)
+ final HRegionInfo regionInfo, final Map>> deadServers)
throws KeeperException, IOException {
Stat stat = new Stat();
- RegionTransitionData data = ZKAssign.getDataAndWatch(watcher,
- encodedRegionName, stat);
+ byte [] data = ZKAssign.getDataAndWatch(watcher, encodedRegionName, stat);
if (data == null) return false;
+ RegionTransition rt;
+ try {
+ rt = RegionTransition.parseFrom(data);
+ } catch (DeserializationException e) {
+ LOG.warn("Failed parse znode data", e);
+ return false;
+ }
HRegionInfo hri = regionInfo;
if (hri == null) {
- if ((hri = getHRegionInfo(data)) == null) return false;
+ if ((hri = getHRegionInfo(rt.getRegionName())) == null) return false;
}
- processRegionsInTransition(data, hri, deadServers, stat.getVersion());
+ processRegionsInTransition(rt, hri, deadServers, stat.getVersion());
return true;
}
- void processRegionsInTransition(final RegionTransitionData data,
- final HRegionInfo regionInfo,
- final Map>> deadServers,
- int expectedVersion)
+ void processRegionsInTransition(final RegionTransition rt, final HRegionInfo regionInfo,
+ final Map>> deadServers, int expectedVersion)
throws KeeperException {
+ EventType et = rt.getEventType();
+ // Get ServerName. Could be null.
+ ServerName sn = rt.getServerName();
String encodedRegionName = regionInfo.getEncodedName();
- LOG.info("Processing region " + regionInfo.getRegionNameAsString() +
- " in state " + data.getEventType());
+ LOG.info("Processing region " + regionInfo.getRegionNameAsString() + " in state " + et);
synchronized (regionsInTransition) {
RegionState regionState = regionsInTransition.get(encodedRegionName);
if (regionState != null ||
@@ -519,21 +525,19 @@ public class AssignmentManager extends ZooKeeperListener {
// Just return
return;
}
- switch (data.getEventType()) {
+ switch (et) {
case M_ZK_REGION_CLOSING:
// If zk node of the region was updated by a live server skip this
// region and just add it into RIT.
- if (isOnDeadServer(regionInfo, deadServers) &&
- (data.getOrigin() == null || !serverManager.isServerOnline(data.getOrigin()))) {
+ if (isOnDeadServer(regionInfo, deadServers) && (sn == null || !isServerOnline(sn))) {
// If was on dead server, its closed now. Force to OFFLINE and this
// will get it reassigned if appropriate
- forceOffline(regionInfo, data);
+ forceOffline(regionInfo, rt);
} else {
// Just insert region into RIT.
// If this never updates the timeout will trigger new assignment
- regionsInTransition.put(encodedRegionName, new RegionState(
- regionInfo, RegionState.State.CLOSING,
- data.getStamp(), data.getOrigin()));
+ regionsInTransition.put(encodedRegionName,
+ getRegionState(regionInfo, RegionState.State.CLOSING, rt));
}
failoverProcessedRegions.put(encodedRegionName, regionInfo);
break;
@@ -541,27 +545,23 @@ public class AssignmentManager extends ZooKeeperListener {
case RS_ZK_REGION_CLOSED:
case RS_ZK_REGION_FAILED_OPEN:
// Region is closed, insert into RIT and handle it
- addToRITandCallClose(regionInfo, RegionState.State.CLOSED, data);
+ addToRITandCallClose(regionInfo, RegionState.State.CLOSED, rt);
failoverProcessedRegions.put(encodedRegionName, regionInfo);
break;
case M_ZK_REGION_OFFLINE:
// If zk node of the region was updated by a live server skip this
// region and just add it into RIT.
- if (isOnDeadServer(regionInfo, deadServers) &&
- (data.getOrigin() == null ||
- !serverManager.isServerOnline(data.getOrigin()))) {
+ if (isOnDeadServer(regionInfo, deadServers) && (sn == null || !isServerOnline(sn))) {
// Region is offline, insert into RIT and handle it like a closed
- addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, data);
- } else if (data.getOrigin() != null &&
- !serverManager.isServerOnline(data.getOrigin())) {
+ addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, rt);
+ } else if (sn != null && !isServerOnline(sn)) {
// to handle cases where offline node is created but sendRegionOpen
// RPC is not yet sent
- addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, data);
+ addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, rt);
} else {
- regionsInTransition.put(encodedRegionName, new RegionState(
- regionInfo, RegionState.State.PENDING_OPEN, data.getStamp(), data
- .getOrigin()));
+ regionsInTransition.put(encodedRegionName,
+ getRegionState(regionInfo, RegionState.State.PENDING_OPEN, rt));
}
failoverProcessedRegions.put(encodedRegionName, regionInfo);
break;
@@ -573,9 +573,8 @@ public class AssignmentManager extends ZooKeeperListener {
// Just insert region into RIT
// If this never updates the timeout will trigger new assignment
if (regionInfo.isMetaTable()) {
- regionsInTransition.put(encodedRegionName, new RegionState(
- regionInfo, RegionState.State.OPENING, data.getStamp(), data
- .getOrigin()));
+ regionsInTransition.put(encodedRegionName,
+ getRegionState(regionInfo, RegionState.State.OPENING, rt));
// If ROOT or .META. table is waiting for timeout monitor to assign
// it may take lot of time when the assignment.timeout.period is
// the default value which may be very long. We will not be able
@@ -584,17 +583,15 @@ public class AssignmentManager extends ZooKeeperListener {
processOpeningState(regionInfo);
break;
}
- regionsInTransition.put(encodedRegionName, new RegionState(regionInfo,
- RegionState.State.OPENING, data.getStamp(), data.getOrigin()));
+ regionsInTransition.put(encodedRegionName,
+ getRegionState(regionInfo, RegionState.State.OPENING, rt));
failoverProcessedRegions.put(encodedRegionName, regionInfo);
break;
case RS_ZK_REGION_OPENED:
// Region is opened, insert into RIT and handle it
- regionsInTransition.put(encodedRegionName, new RegionState(
- regionInfo, RegionState.State.OPEN,
- data.getStamp(), data.getOrigin()));
- ServerName sn = data.getOrigin() == null? null: data.getOrigin();
+ regionsInTransition.put(encodedRegionName,
+ getRegionState(regionInfo, RegionState.State.OPEN, rt));
// sn could be null if this server is no longer online. If
// that is the case, just let this RIT timeout; it'll be assigned
// to new server then.
@@ -605,10 +602,9 @@ public class AssignmentManager extends ZooKeeperListener {
} else if (!serverManager.isServerOnline(sn)
&& (isOnDeadServer(regionInfo, deadServers)
|| regionInfo.isMetaRegion() || regionInfo.isRootRegion())) {
- forceOffline(regionInfo, data);
+ forceOffline(regionInfo, rt);
} else {
- new OpenedRegionHandler(master, this, regionInfo, sn, expectedVersion)
- .process();
+ new OpenedRegionHandler(master, this, regionInfo, sn, expectedVersion).process();
}
failoverProcessedRegions.put(encodedRegionName, regionInfo);
break;
@@ -623,19 +619,18 @@ public class AssignmentManager extends ZooKeeperListener {
/**
* Put the region hri into an offline state up in zk.
* @param hri
- * @param oldData
+ * @param oldRt
* @throws KeeperException
*/
- private void forceOffline(final HRegionInfo hri,
- final RegionTransitionData oldData)
+ private void forceOffline(final HRegionInfo hri, final RegionTransition oldRt)
throws KeeperException {
// If was on dead server, its closed now. Force to OFFLINE and then
// handle it like a close; this will get it reassigned if appropriate
- LOG.debug("RIT " + hri.getEncodedName() + " in state=" +
- oldData.getEventType() + " was on deadserver; forcing offline");
+ LOG.debug("RIT " + hri.getEncodedName() + " in state=" + oldRt.getEventType() +
+ " was on deadserver; forcing offline");
ZKAssign.createOrForceNodeOffline(this.watcher, hri,
this.master.getServerName());
- addToRITandCallClose(hri, RegionState.State.OFFLINE, oldData);
+ addToRITandCallClose(hri, RegionState.State.OFFLINE, oldRt);
}
/**
@@ -646,13 +641,23 @@ public class AssignmentManager extends ZooKeeperListener {
* @param oldData
*/
private void addToRITandCallClose(final HRegionInfo hri,
- final RegionState.State state, final RegionTransitionData oldData) {
- this.regionsInTransition.put(hri.getEncodedName(),
- new RegionState(hri, state, oldData.getStamp(), oldData.getOrigin()));
+ final RegionState.State state, final RegionTransition oldData) {
+ this.regionsInTransition.put(hri.getEncodedName(), getRegionState(hri, state, oldData));
new ClosedRegionHandler(this.master, this, hri).process();
}
/**
+ * @param hri
+ * @param state
+ * @param rt
+ * @return A new {@link RegionState} instance made of the passed arguments
+ */
+ RegionState getRegionState(final HRegionInfo hri, final RegionState.State state,
+ final RegionTransition rt) {
+ return new RegionState(hri, state, rt.getCreateTime(), rt.getServerName());
+ }
+
+ /**
* When a region is closed, it should be removed from the regionsToReopen
* @param hri HRegionInfo of the region which was closed
*/
@@ -689,41 +694,46 @@ public class AssignmentManager extends ZooKeeperListener {
*
* This deals with skipped transitions (we got a CLOSED but didn't see CLOSING
* yet).
- * @param data
+ * @param rt
* @param expectedVersion
*/
- private void handleRegion(final RegionTransitionData data, int expectedVersion) {
+ private void handleRegion(final RegionTransition rt, int expectedVersion) {
synchronized(regionsInTransition) {
HRegionInfo hri = null;
- if (data == null || data.getOrigin() == null) {
- LOG.warn("Unexpected NULL input " + data);
+ if (rt == null) {
+ LOG.warn("Unexpected NULL input " + rt);
+ return;
+ }
+ final ServerName sn = rt.getServerName();
+ if (sn == null) {
+ LOG.warn("Null servername: " + rt);
return;
}
- ServerName sn = data.getOrigin();
// Check if this is a special HBCK transition
if (sn.equals(HConstants.HBCK_CODE_SERVERNAME)) {
- handleHBCK(data);
+ handleHBCK(rt);
return;
}
- String encodedName = HRegionInfo.encodeRegionName(data.getRegionName());
+ final long createTime = rt.getCreateTime();
+ final byte [] regionName = rt.getRegionName();
+ String encodedName = HRegionInfo.encodeRegionName(regionName);
String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
// Verify this is a known server
if (!serverManager.isServerOnline(sn) &&
!this.master.getServerName().equals(sn)
- && !ignoreStatesRSOffline.contains(data.getEventType())) {
+ && !ignoreStatesRSOffline.contains(rt.getEventType())) {
LOG.warn("Attempted to handle region transition for server but " +
"server is not online: " + prettyPrintedRegionName);
return;
}
// Printing if the event was created a long time ago helps debugging
- boolean lateEvent = data.getStamp() <
- (System.currentTimeMillis() - 15000);
- LOG.debug("Handling transition=" + data.getEventType() +
- ", server=" + data.getOrigin() + ", region=" +
+ boolean lateEvent = createTime < (System.currentTimeMillis() - 15000);
+ LOG.debug("Handling transition=" + rt.getEventType() +
+ ", server=" + sn + ", region=" +
(prettyPrintedRegionName == null? "null": prettyPrintedRegionName) +
(lateEvent? ", which is more than 15 seconds late" : ""));
RegionState regionState = regionsInTransition.get(encodedName);
- switch (data.getEventType()) {
+ switch (rt.getEventType()) {
case M_ZK_REGION_OFFLINE:
// Nothing to do.
break;
@@ -751,7 +761,7 @@ public class AssignmentManager extends ZooKeeperListener {
" but region was not first in SPLITTING state; continuing");
}
// Check it has daughters.
- byte [] payload = data.getPayload();
+ byte [] payload = rt.getPayload();
List daughters = null;
try {
daughters = Writables.getHRegionInfos(payload, 0, payload.length);
@@ -772,10 +782,9 @@ public class AssignmentManager extends ZooKeeperListener {
break;
case M_ZK_REGION_CLOSING:
- hri = checkIfInFailover(regionState, encodedName, data);
+ hri = checkIfInFailover(regionState, encodedName, regionName);
if (hri != null) {
- regionState = new RegionState(hri, RegionState.State.CLOSING, data
- .getStamp(), data.getOrigin());
+ regionState = new RegionState(hri, RegionState.State.CLOSING, createTime, sn);
regionsInTransition.put(encodedName, regionState);
failoverProcessedRegions.put(encodedName, hri);
break;
@@ -785,21 +794,19 @@ public class AssignmentManager extends ZooKeeperListener {
if (regionState == null ||
(!regionState.isPendingClose() && !regionState.isClosing())) {
LOG.warn("Received CLOSING for region " + prettyPrintedRegionName +
- " from server " + data.getOrigin() + " but region was in " +
+ " from server " + sn + " but region was in " +
" the state " + regionState + " and not " +
"in expected PENDING_CLOSE or CLOSING states");
return;
}
// Transition to CLOSING (or update stamp if already CLOSING)
- regionState.update(RegionState.State.CLOSING,
- data.getStamp(), data.getOrigin());
+ regionState.update(RegionState.State.CLOSING, createTime, sn);
break;
case RS_ZK_REGION_CLOSED:
- hri = checkIfInFailover(regionState, encodedName, data);
+ hri = checkIfInFailover(regionState, encodedName, regionName);
if (hri != null) {
- regionState = new RegionState(hri, RegionState.State.CLOSED, data
- .getStamp(), data.getOrigin());
+ regionState = new RegionState(hri, RegionState.State.CLOSED, createTime, sn);
regionsInTransition.put(encodedName, regionState);
removeClosedRegion(regionState.getRegion());
new ClosedRegionHandler(master, this, regionState.getRegion())
@@ -811,7 +818,7 @@ public class AssignmentManager extends ZooKeeperListener {
if (regionState == null ||
(!regionState.isPendingClose() && !regionState.isClosing())) {
LOG.warn("Received CLOSED for region " + prettyPrintedRegionName +
- " from server " + data.getOrigin() + " but region was in " +
+ " from server " + sn + " but region was in " +
" the state " + regionState + " and not " +
"in expected PENDING_CLOSE or CLOSING states");
return;
@@ -819,18 +826,16 @@ public class AssignmentManager extends ZooKeeperListener {
// Handle CLOSED by assigning elsewhere or stopping if a disable
// If we got here all is good. Need to update RegionState -- else
// what follows will fail because not in expected state.
- regionState.update(RegionState.State.CLOSED,
- data.getStamp(), data.getOrigin());
+ regionState.update(RegionState.State.CLOSED, createTime, sn);
removeClosedRegion(regionState.getRegion());
this.executorService.submit(new ClosedRegionHandler(master,
this, regionState.getRegion()));
break;
case RS_ZK_REGION_FAILED_OPEN:
- hri = checkIfInFailover(regionState, encodedName, data);
+ hri = checkIfInFailover(regionState, encodedName, regionName);
if (hri != null) {
- regionState = new RegionState(hri, RegionState.State.CLOSED, data
- .getStamp(), data.getOrigin());
+ regionState = new RegionState(hri, RegionState.State.CLOSED, createTime, sn);
regionsInTransition.put(encodedName, regionState);
new ClosedRegionHandler(master, this, regionState.getRegion())
.process();
@@ -840,22 +845,20 @@ public class AssignmentManager extends ZooKeeperListener {
if (regionState == null ||
(!regionState.isPendingOpen() && !regionState.isOpening())) {
LOG.warn("Received FAILED_OPEN for region " + prettyPrintedRegionName +
- " from server " + data.getOrigin() + " but region was in " +
+ " from server " + sn + " but region was in " +
" the state " + regionState + " and not in PENDING_OPEN or OPENING");
return;
}
// Handle this the same as if it were opened and then closed.
- regionState.update(RegionState.State.CLOSED,
- data.getStamp(), data.getOrigin());
+ regionState.update(RegionState.State.CLOSED, createTime, sn);
this.executorService.submit(new ClosedRegionHandler(master,
this, regionState.getRegion()));
break;
case RS_ZK_REGION_OPENING:
- hri = checkIfInFailover(regionState, encodedName, data);
+ hri = checkIfInFailover(regionState, encodedName, regionName);
if (hri != null) {
- regionState = new RegionState(hri, RegionState.State.OPENING, data
- .getStamp(), data.getOrigin());
+ regionState = new RegionState(hri, RegionState.State.OPENING, createTime, sn);
regionsInTransition.put(encodedName, regionState);
failoverProcessedRegions.put(encodedName, hri);
break;
@@ -866,24 +869,21 @@ public class AssignmentManager extends ZooKeeperListener {
(!regionState.isPendingOpen() && !regionState.isOpening())) {
LOG.warn("Received OPENING for region " +
prettyPrintedRegionName +
- " from server " + data.getOrigin() + " but region was in " +
+ " from server " + sn + " but region was in " +
" the state " + regionState + " and not " +
"in expected PENDING_OPEN or OPENING states");
return;
}
// Transition to OPENING (or update stamp if already OPENING)
- regionState.update(RegionState.State.OPENING,
- data.getStamp(), data.getOrigin());
+ regionState.update(RegionState.State.OPENING, createTime, sn);
break;
case RS_ZK_REGION_OPENED:
- hri = checkIfInFailover(regionState, encodedName, data);
+ hri = checkIfInFailover(regionState, encodedName, regionName);
if (hri != null) {
- regionState = new RegionState(hri, RegionState.State.OPEN, data
- .getStamp(), data.getOrigin());
+ regionState = new RegionState(hri, RegionState.State.OPEN, createTime, sn);
regionsInTransition.put(encodedName, regionState);
- new OpenedRegionHandler(master, this, regionState.getRegion(), data
- .getOrigin(), expectedVersion).process();
+ new OpenedRegionHandler(master, this, regionState.getRegion(), sn, expectedVersion).process();
failoverProcessedRegions.put(encodedName, hri);
break;
}
@@ -892,17 +892,15 @@ public class AssignmentManager extends ZooKeeperListener {
(!regionState.isPendingOpen() && !regionState.isOpening())) {
LOG.warn("Received OPENED for region " +
prettyPrintedRegionName +
- " from server " + data.getOrigin() + " but region was in " +
+ " from server " + sn + " but region was in " +
" the state " + regionState + " and not " +
"in expected PENDING_OPEN or OPENING states");
return;
}
// Handle OPENED by removing from transition and deleted zk node
- regionState.update(RegionState.State.OPEN,
- data.getStamp(), data.getOrigin());
+ regionState.update(RegionState.State.OPEN, createTime, sn);
this.executorService.submit(
- new OpenedRegionHandler(master, this, regionState.getRegion(),
- data.getOrigin(), expectedVersion));
+ new OpenedRegionHandler(master, this, regionState.getRegion(), sn, expectedVersion));
break;
default:
@@ -920,12 +918,12 @@ public class AssignmentManager extends ZooKeeperListener {
* @return hri
*/
private HRegionInfo checkIfInFailover(RegionState regionState,
- String encodedName, RegionTransitionData data) {
+ String encodedName, final byte [] regionName) {
if (regionState == null && this.failover &&
(failoverProcessedRegions.containsKey(encodedName) == false ||
failoverProcessedRegions.get(encodedName) == null)) {
HRegionInfo hri = this.failoverProcessedRegions.get(encodedName);
- if (hri == null) hri = getHRegionInfo(data);
+ if (hri == null) hri = getHRegionInfo(regionName);
return hri;
}
return null;
@@ -933,18 +931,18 @@ public class AssignmentManager extends ZooKeeperListener {
/**
* Gets the HRegionInfo from the META table
- * @param data
+ * @param regionName
* @return HRegionInfo hri for the region
*/
- private HRegionInfo getHRegionInfo(RegionTransitionData data) {
+ private HRegionInfo getHRegionInfo(final byte [] regionName) {
Pair p = null;
try {
- p = MetaReader.getRegion(catalogTracker, data.getRegionName());
+ p = MetaReader.getRegion(catalogTracker, regionName);
if (p == null) return null;
return p.getFirst();
} catch (IOException e) {
- master.abort("Aborting because error occoured while reading "
- + Bytes.toStringBinary(data.getRegionName()) + " from .META.", e);
+ master.abort("Aborting because error occoured while reading " +
+ Bytes.toStringBinary(regionName) + " from .META.", e);
return null;
}
}
@@ -1042,22 +1040,22 @@ public class AssignmentManager extends ZooKeeperListener {
* Handle a ZK unassigned node transition triggered by HBCK repair tool.
*
* This is handled in a separate code path because it breaks the normal rules.
- * @param data
+ * @param rt
*/
- private void handleHBCK(RegionTransitionData data) {
- String encodedName = HRegionInfo.encodeRegionName(data.getRegionName());
- LOG.info("Handling HBCK triggered transition=" + data.getEventType() +
- ", server=" + data.getOrigin() + ", region=" +
+ private void handleHBCK(RegionTransition rt) {
+ String encodedName = HRegionInfo.encodeRegionName(rt.getRegionName());
+ LOG.info("Handling HBCK triggered transition=" + rt.getEventType() +
+ ", server=" + rt.getServerName() + ", region=" +
HRegionInfo.prettyPrint(encodedName));
RegionState regionState = regionsInTransition.get(encodedName);
- switch (data.getEventType()) {
+ switch (rt.getEventType()) {
case M_ZK_REGION_OFFLINE:
HRegionInfo regionInfo = null;
if (regionState != null) {
regionInfo = regionState.getRegion();
} else {
try {
- byte[] name = data.getRegionName();
+ byte [] name = rt.getRegionName();
Pair p = MetaReader.getRegion(catalogTracker, name);
regionInfo = p.getFirst();
} catch (IOException e) {
@@ -1072,8 +1070,7 @@ public class AssignmentManager extends ZooKeeperListener {
break;
default:
- LOG.warn("Received unexpected region state from HBCK (" +
- data.getEventType() + ")");
+ LOG.warn("Received unexpected region state from HBCK: " + rt.toString());
break;
}
}
@@ -1094,18 +1091,7 @@ public class AssignmentManager extends ZooKeeperListener {
*/
@Override
public void nodeCreated(String path) {
- if(path.startsWith(watcher.assignmentZNode)) {
- try {
- Stat stat = new Stat();
- RegionTransitionData data = ZKAssign.getDataAndWatch(watcher, path, stat);
- if (data == null) {
- return;
- }
- handleRegion(data, stat.getVersion());
- } catch (KeeperException e) {
- master.abort("Unexpected ZK exception reading unassigned node data", e);
- }
- }
+ handleAssignmentEvent(path);
}
/**
@@ -1122,17 +1108,21 @@ public class AssignmentManager extends ZooKeeperListener {
*/
@Override
public void nodeDataChanged(String path) {
- if(path.startsWith(watcher.assignmentZNode)) {
- try {
- Stat stat = new Stat();
- RegionTransitionData data = ZKAssign.getDataAndWatch(watcher, path, stat);
- if (data == null) {
- return;
- }
- handleRegion(data, stat.getVersion());
- } catch (KeeperException e) {
- master.abort("Unexpected ZK exception reading unassigned node data", e);
- }
+ handleAssignmentEvent(path);
+ }
+
+ private void handleAssignmentEvent(final String path) {
+ if (!path.startsWith(watcher.assignmentZNode)) return;
+ try {
+ Stat stat = new Stat();
+ byte [] data = ZKAssign.getDataAndWatch(watcher, path, stat);
+ if (data == null) return;
+ RegionTransition rt = RegionTransition.parseFrom(data);
+ handleRegion(rt, stat.getVersion());
+ } catch (KeeperException e) {
+ master.abort("Unexpected ZK exception reading unassigned node data", e);
+ } catch (DeserializationException e) {
+ master.abort("Unexpected exception deserializing node data", e);
}
}
@@ -1983,7 +1973,8 @@ public class AssignmentManager extends ZooKeeperListener {
+ "can't be created.");
return;
}
- } catch (KeeperException e) {
+ } catch (KeeperException ee) {
+ Exception e = ee;
if (e instanceof NodeExistsException) {
// Handle race between master initiated close and regionserver
// orchestrated splitting. See if existing node is in a
@@ -2004,6 +1995,8 @@ public class AssignmentManager extends ZooKeeperListener {
return;
} catch (KeeperException ke) {
LOG.error("Unexpected zk state", ke);
+ } catch (DeserializationException de) {
+ LOG.error("Failed parse", de);
}
}
// If we get here, don't understand whats going on -- abort.
@@ -2125,14 +2118,17 @@ public class AssignmentManager extends ZooKeeperListener {
* @param path
* @return True if znode is in SPLIT or SPLITTING state.
* @throws KeeperException Can happen if the znode went away in meantime.
+ * @throws DeserializationException
*/
- private boolean isSplitOrSplitting(final String path) throws KeeperException {
+ private boolean isSplitOrSplitting(final String path)
+ throws KeeperException, DeserializationException {
boolean result = false;
// This may fail if the SPLIT or SPLITTING znode gets cleaned up before we
// can get data from it.
- RegionTransitionData data = ZKAssign.getData(master.getZooKeeper(), path);
- EventType evt = data.getEventType();
- switch (evt) {
+ byte [] data = ZKAssign.getData(master.getZooKeeper(), path);
+ if (data == null) return false;
+ RegionTransition rt = RegionTransition.parseFrom(data);
+ switch (rt.getEventType()) {
case RS_ZK_REGION_SPLIT:
case RS_ZK_REGION_SPLITTING:
result = true;
@@ -2659,8 +2655,8 @@ public class AssignmentManager extends ZooKeeperListener {
* @throws KeeperException
*/
private void processDeadServersAndRecoverLostRegions(
- Map>> deadServers,
- List nodes) throws IOException, KeeperException {
+ Map>> deadServers, List nodes)
+ throws IOException, KeeperException {
if (null != deadServers) {
for (Map.Entry>> deadServer :
deadServers.entrySet()) {
@@ -2671,17 +2667,25 @@ public class AssignmentManager extends ZooKeeperListener {
// If region was in transition (was in zk) force it offline for
// reassign
try {
- RegionTransitionData data = ZKAssign.getData(watcher,
- regionInfo.getEncodedName());
+ byte [] data = ZKAssign.getData(watcher, regionInfo.getEncodedName());
+ if (data == null) {
+ LOG.warn("No data in znode for " + regionInfo.getEncodedName());
+ continue;
+ }
+ RegionTransition rt;
+ try {
+ rt = RegionTransition.parseFrom(data);
+ } catch (DeserializationException e) {
+ LOG.warn("Failed parse of znode data for " + regionInfo.getEncodedName(), e);
+ continue;
+ }
// If zk node of this region has been updated by a live server,
// we consider that this region is being handled.
- // So we should skip it and process it in
- // processRegionsInTransition.
- if (data != null && data.getOrigin() != null &&
- serverManager.isServerOnline(data.getOrigin())) {
- LOG.info("The region " + regionInfo.getEncodedName()
- + "is being handled on " + data.getOrigin());
+ // So we should skip it and process it in processRegionsInTransition.
+ ServerName sn = rt.getServerName();
+ if (isServerOnline(sn)) {
+ LOG.info("The region " + regionInfo.getEncodedName() + "is being handled on " + sn);
continue;
}
// Process with existing RS shutdown code
@@ -2967,27 +2971,28 @@ public class AssignmentManager extends ZooKeeperListener {
try {
String node = ZKAssign.getNodeName(watcher, regionInfo.getEncodedName());
Stat stat = new Stat();
- RegionTransitionData dataInZNode = ZKAssign.getDataNoWatch(watcher, node,
- stat);
- if (dataInZNode == null) {
+ byte [] data = ZKAssign.getDataNoWatch(watcher, node, stat);
+ if (data == null) {
LOG.warn("Data is null, node " + node + " no longer exists");
return;
}
- if (dataInZNode.getEventType() == EventType.RS_ZK_REGION_OPENED) {
+ RegionTransition rt = RegionTransition.parseFrom(data);
+ EventType et = rt.getEventType();
+ if (et == EventType.RS_ZK_REGION_OPENED) {
LOG.debug("Region has transitioned to OPENED, allowing "
+ "watched event handlers to process");
return;
- } else if (dataInZNode.getEventType() != EventType.RS_ZK_REGION_OPENING &&
- dataInZNode.getEventType() != EventType.RS_ZK_REGION_FAILED_OPEN ) {
- LOG.warn("While timing out a region in state OPENING, "
- + "found ZK node in unexpected state: "
- + dataInZNode.getEventType());
+ } else if (et != EventType.RS_ZK_REGION_OPENING && et != EventType.RS_ZK_REGION_FAILED_OPEN ) {
+ LOG.warn("While timing out a region, found ZK node in unexpected state: " + et);
return;
}
invokeAssign(regionInfo);
} catch (KeeperException ke) {
LOG.error("Unexpected ZK exception timing out CLOSING region", ke);
return;
+ } catch (DeserializationException e) {
+ LOG.error("Unexpected exception parsing CLOSING region", e);
+ return;
}
return;
}
@@ -3019,16 +3024,18 @@ public class AssignmentManager extends ZooKeeperListener {
* @return whether the serverName currently hosts the region
*/
public boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) {
- RegionTransitionData data = null;
+ RegionTransition rt = null;
try {
- data = ZKAssign.getData(master.getZooKeeper(), hri.getEncodedName());
+ byte [] data = ZKAssign.getData(master.getZooKeeper(), hri.getEncodedName());
+ if (data == null) return false;
+ rt = RegionTransition.parseFrom(data);
} catch (KeeperException e) {
- master.abort("Unexpected ZK exception reading unassigned node for region="
- + hri.getEncodedName(), e);
+ master.abort("Exception reading unassigned node for region=" + hri.getEncodedName(), e);
+ } catch (DeserializationException e) {
+ master.abort("Exception parsing unassigned node for region=" + hri.getEncodedName(), e);
}
-
- ServerName addressFromZK = (data != null && data.getOrigin() != null) ?
- data.getOrigin() : null;
+
+ ServerName addressFromZK = rt.getServerName();
if (addressFromZK != null) {
// if we get something from ZK, we will use the data
boolean matchZK = (addressFromZK != null &&
@@ -3048,6 +3055,7 @@ public class AssignmentManager extends ZooKeeperListener {
return matchAM;
}
+
/**
* Process shutdown server removing any assignments.
* @param sn Server that went down.
@@ -3391,7 +3399,7 @@ public class AssignmentManager extends ZooKeeperListener {
* @return True if online.
*/
public boolean isServerOnline(ServerName serverName) {
- return this.serverManager.isServerOnline(serverName);
+ return serverName != null && this.serverManager.isServerOnline(serverName);
}
/**
* Shutdown the threadpool executor service
diff --git src/main/java/org/apache/hadoop/hbase/master/HMaster.java src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 81e9023..353d564 100644
--- src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.DeserializationException;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
@@ -1427,7 +1428,7 @@ Server {
List backupMasterStrings;
try {
backupMasterStrings = ZKUtil.listChildrenNoWatch(this.zooKeeper,
- this.zooKeeper.backupMasterAddressesZNode);
+ this.zooKeeper.backupMasterAddressesZNode);
} catch (KeeperException e) {
LOG.warn(this.zooKeeper.prefix("Unable to list backup servers"), e);
backupMasterStrings = new ArrayList(0);
@@ -1436,9 +1437,17 @@ Server {
backupMasterStrings.size());
for (String s: backupMasterStrings) {
try {
- byte[] bytes = ZKUtil.getData(this.zooKeeper, ZKUtil.joinZNode(this.zooKeeper.backupMasterAddressesZNode, s));
+ byte [] bytes =
+ ZKUtil.getData(this.zooKeeper, ZKUtil.joinZNode(this.zooKeeper.backupMasterAddressesZNode, s));
if (bytes != null) {
- backupMasters.add(ZKUtil.znodeContentToServerName(bytes));
+ ServerName sn;
+ try {
+ sn = ServerName.parseFrom(bytes);
+ } catch (DeserializationException e) {
+ LOG.warn("Failed parse, skipping registering backup server", e);
+ continue;
+ }
+ backupMasters.add(sn);
}
} catch (KeeperException e) {
LOG.warn(this.zooKeeper.prefix("Unable to get information about " +
diff --git src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
index 919c65f..edfff8c 100644
--- src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
+++ src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
@@ -1,7 +1,5 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
+ * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
@@ -19,14 +17,19 @@
*/
package org.apache.hadoop.hbase.master;
-import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*;
+import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK;
+import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE;
+import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED;
+import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE;
+import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS;
+import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
-import java.util.Set;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -38,7 +41,10 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Chore;
+import org.apache.hadoop.hbase.SplitLogCounters;
+import org.apache.hadoop.hbase.DeserializationException;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
@@ -59,36 +65,31 @@ import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
-import org.apache.hadoop.hbase.zookeeper.ZKSplitLog.TaskState;
-
-import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.*;
-import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.*;
/**
* Distributes the task of log splitting to the available region servers.
* Coordination happens via zookeeper. For every log file that has to be split a
- * znode is created under /hbase/splitlog. SplitLogWorkers race to grab a task.
+ * znode is created under /hbase/splitlog. SplitLogWorkers race to grab a task.
*
- * SplitLogManager monitors the task znodes that it creates using the
+ * SplitLogManager monitors the task znodes that it creates using the
* timeoutMonitor thread. If a task's progress is slow then
- * resubmit(String, boolean) will take away the task from the owner
- * {@link SplitLogWorker} and the task will be
- * upforgrabs again. When the task is done then the task's znode is deleted by
- * SplitLogManager.
+ * {@link #resubmit(String, Task, ResubmitDirective)} will take away the task from the owner
+ * {@link SplitLogWorker} and the task will be up for grabs again. When the task is done then the
+ * task's znode is deleted by SplitLogManager.
*
- * Clients call {@link #splitLogDistributed(Path)} to split a region server's
+ *
Clients call {@link #splitLogDistributed(Path)} to split a region server's
* log files. The caller thread waits in this method until all the log files
* have been split.
*
- * All the zookeeper calls made by this class are asynchronous. This is mainly
+ *
All the zookeeper calls made by this class are asynchronous. This is mainly
* to help reduce response time seen by the callers.
*
- * There is race in this design between the SplitLogManager and the
+ *
There is race in this design between the SplitLogManager and the
* SplitLogWorker. SplitLogManager might re-queue a task that has in reality
* already been completed by a SplitLogWorker. We rely on the idempotency of
* the log splitting task for correctness.
*
- * It is also assumed that every log splitting task is unique and once
+ *
It is also assumed that every log splitting task is unique and once
* completed (either with success or with error) it will be not be submitted
* again. If a task is resubmitted then there is a risk that old "delete task"
* can delete the re-submission.
@@ -97,8 +98,13 @@ import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.*
public class SplitLogManager extends ZooKeeperListener {
private static final Log LOG = LogFactory.getLog(SplitLogManager.class);
+ public static final int DEFAULT_TIMEOUT = 25000; // 25 sec
+ public static final int DEFAULT_ZK_RETRIES = 3;
+ public static final int DEFAULT_MAX_RESUBMIT = 3;
+ public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); //3 min
+
private final Stoppable stopper;
- private final String serverName;
+ private final ServerName serverName;
private final TaskFinisher taskFinisher;
private FileSystem fs;
private Configuration conf;
@@ -110,8 +116,7 @@ public class SplitLogManager extends ZooKeeperListener {
private long lastNodeCreateTime = Long.MAX_VALUE;
public boolean ignoreZKDeleteForTesting = false;
- private ConcurrentMap tasks =
- new ConcurrentHashMap();
+ private ConcurrentMap tasks = new ConcurrentHashMap();
private TimeoutMonitor timeoutMonitor;
private Set deadWorkers = null;
@@ -130,7 +135,7 @@ public class SplitLogManager extends ZooKeeperListener {
* @param serverName
*/
public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf,
- Stoppable stopper, String serverName) {
+ Stoppable stopper, ServerName serverName) {
this(zkw, conf, stopper, serverName, new TaskFinisher() {
@Override
public Status finish(String workerName, String logfile) {
@@ -159,20 +164,16 @@ public class SplitLogManager extends ZooKeeperListener {
* @param tf task finisher
*/
public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf,
- Stoppable stopper, String serverName, TaskFinisher tf) {
+ Stoppable stopper, ServerName serverName, TaskFinisher tf) {
super(zkw);
this.taskFinisher = tf;
this.conf = conf;
this.stopper = stopper;
- this.zkretries = conf.getLong("hbase.splitlog.zk.retries",
- ZKSplitLog.DEFAULT_ZK_RETRIES);
- this.resubmit_threshold = conf.getLong("hbase.splitlog.max.resubmit",
- ZKSplitLog.DEFAULT_MAX_RESUBMIT);
- this.timeout = conf.getInt("hbase.splitlog.manager.timeout",
- ZKSplitLog.DEFAULT_TIMEOUT);
+ this.zkretries = conf.getLong("hbase.splitlog.zk.retries", DEFAULT_ZK_RETRIES);
+ this.resubmit_threshold = conf.getLong("hbase.splitlog.max.resubmit", DEFAULT_MAX_RESUBMIT);
+ this.timeout = conf.getInt("hbase.splitlog.manager.timeout", DEFAULT_TIMEOUT);
this.unassignedTimeout =
- conf.getInt("hbase.splitlog.manager.unassigned.timeout",
- ZKSplitLog.DEFAULT_UNASSIGNED_TIMEOUT);
+ conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
LOG.debug("timeout = " + timeout);
LOG.debug("unassigned timeout = " + unassignedTimeout);
@@ -244,7 +245,7 @@ public class SplitLogManager extends ZooKeeperListener {
FileStatus[] logfiles = getFileList(logDirs);
status.setStatus("Checking directory contents...");
LOG.debug("Scheduling batch of logs to split");
- tot_mgr_log_split_batch_start.incrementAndGet();
+ SplitLogCounters.tot_mgr_log_split_batch_start.incrementAndGet();
LOG.info("started splitting logs in " + logDirs);
long t = EnvironmentEdgeManager.currentTimeMillis();
long totalSize = 0;
@@ -264,7 +265,7 @@ public class SplitLogManager extends ZooKeeperListener {
waitForSplittingCompletion(batch, status);
if (batch.done != batch.installed) {
batch.isDead = true;
- tot_mgr_log_split_batch_err.incrementAndGet();
+ SplitLogCounters.tot_mgr_log_split_batch_err.incrementAndGet();
LOG.warn("error while splitting logs in " + logDirs +
" installed = " + batch.installed + " but only " + batch.done + " done");
throw new IOException("error or interrupt while splitting logs in "
@@ -285,7 +286,7 @@ public class SplitLogManager extends ZooKeeperListener {
LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe);
}
}
- tot_mgr_log_split_batch_success.incrementAndGet();
+ SplitLogCounters.tot_mgr_log_split_batch_success.incrementAndGet();
}
String msg = "finished splitting (more than or equal to) " + totalSize +
" bytes in " + batch.installed + " log files in " + logDirs + " in " +
@@ -303,7 +304,7 @@ public class SplitLogManager extends ZooKeeperListener {
* @return true if a new entry is created, false if it is already there.
*/
boolean enqueueSplitTask(String taskname, TaskBatch batch) {
- tot_mgr_log_split_start.incrementAndGet();
+ SplitLogCounters.tot_mgr_log_split_start.incrementAndGet();
String path = ZKSplitLog.getEncodedNodeName(watcher, taskname);
Task oldtask = createTaskIfAbsent(path, batch);
if (oldtask == null) {
@@ -340,17 +341,17 @@ public class SplitLogManager extends ZooKeeperListener {
Task task = tasks.get(path);
if (task == null) {
if (!ZKSplitLog.isRescanNode(watcher, path)) {
- tot_mgr_unacquired_orphan_done.incrementAndGet();
+ SplitLogCounters.tot_mgr_unacquired_orphan_done.incrementAndGet();
LOG.debug("unacquired orphan task is done " + path);
}
} else {
synchronized (task) {
if (task.status == IN_PROGRESS) {
if (status == SUCCESS) {
- tot_mgr_log_split_success.incrementAndGet();
+ SplitLogCounters.tot_mgr_log_split_success.incrementAndGet();
LOG.info("Done splitting " + path);
} else {
- tot_mgr_log_split_err.incrementAndGet();
+ SplitLogCounters.tot_mgr_log_split_err.incrementAndGet();
LOG.warn("Error splitting " + path);
}
task.status = status;
@@ -376,10 +377,9 @@ public class SplitLogManager extends ZooKeeperListener {
}
private void createNode(String path, Long retry_count) {
- ZKUtil.asyncCreate(this.watcher, path,
- TaskState.TASK_UNASSIGNED.get(serverName), new CreateAsyncCallback(),
- retry_count);
- tot_mgr_node_create_queued.incrementAndGet();
+ SplitLogTask slt = new SplitLogTask.Unassigned(serverName);
+ ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(), retry_count);
+ SplitLogCounters.tot_mgr_node_create_queued.incrementAndGet();
return;
}
@@ -400,7 +400,7 @@ public class SplitLogManager extends ZooKeeperListener {
this.watcher.getRecoverableZooKeeper().getZooKeeper().
getData(path, this.watcher,
new GetDataAsyncCallback(), retry_count);
- tot_mgr_get_data_queued.incrementAndGet();
+ SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet();
}
private void tryGetDataSetWatch(String path) {
@@ -408,37 +408,36 @@ public class SplitLogManager extends ZooKeeperListener {
this.watcher.getRecoverableZooKeeper().getZooKeeper().
getData(path, this.watcher,
new GetDataAsyncCallback(), Long.valueOf(-1) /* retry count */);
- tot_mgr_get_data_queued.incrementAndGet();
+ SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet();
}
- private void getDataSetWatchSuccess(String path, byte[] data, int version) {
+ private void getDataSetWatchSuccess(String path, byte[] data, int version)
+ throws DeserializationException {
if (data == null) {
if (version == Integer.MIN_VALUE) {
// assume all done. The task znode suddenly disappeared.
setDone(path, SUCCESS);
return;
}
- tot_mgr_null_data.incrementAndGet();
+ SplitLogCounters.tot_mgr_null_data.incrementAndGet();
LOG.fatal("logic error - got null data " + path);
setDone(path, FAILURE);
return;
}
data = this.watcher.getRecoverableZooKeeper().removeMetaData(data);
- // LOG.debug("set watch on " + path + " got data " + new String(data));
- if (TaskState.TASK_UNASSIGNED.equals(data)) {
+ SplitLogTask slt = SplitLogTask.parseFrom(data);
+ if (slt.isUnassigned()) {
LOG.debug("task not yet acquired " + path + " ver = " + version);
handleUnassignedTask(path);
- } else if (TaskState.TASK_OWNED.equals(data)) {
- heartbeat(path, version,
- TaskState.TASK_OWNED.getWriterName(data));
- } else if (TaskState.TASK_RESIGNED.equals(data)) {
+ } else if (slt.isOwned()) {
+ heartbeat(path, version, slt.toString());
+ } else if (slt.isResigned()) {
LOG.info("task " + path + " entered state " + new String(data));
resubmitOrFail(path, FORCE);
- } else if (TaskState.TASK_DONE.equals(data)) {
+ } else if (slt.isDone()) {
LOG.info("task " + path + " entered state " + new String(data));
if (taskFinisher != null && !ZKSplitLog.isRescanNode(watcher, path)) {
- if (taskFinisher.finish(TaskState.TASK_DONE.getWriterName(data),
- ZKSplitLog.getFileName(path)) == Status.DONE) {
+ if (taskFinisher.finish(slt.toString(), ZKSplitLog.getFileName(path)) == Status.DONE) {
setDone(path, SUCCESS);
} else {
resubmitOrFail(path, CHECK);
@@ -446,12 +445,11 @@ public class SplitLogManager extends ZooKeeperListener {
} else {
setDone(path, SUCCESS);
}
- } else if (TaskState.TASK_ERR.equals(data)) {
+ } else if (slt.isErr()) {
LOG.info("task " + path + " entered state " + new String(data));
resubmitOrFail(path, CHECK);
} else {
- LOG.fatal("logic error - unexpected zk state for path = " + path
- + " data = " + new String(data));
+ LOG.fatal("logic error - unexpected zk state for path = " + path + " data = " + new String(data));
setDone(path, FAILURE);
}
}
@@ -483,8 +481,7 @@ public class SplitLogManager extends ZooKeeperListener {
}
}
- private void heartbeat(String path, int new_version,
- String workerName) {
+ private void heartbeat(String path, int new_version, String workerName) {
Task task = findOrCreateOrphanTask(path);
if (new_version != task.last_version) {
if (task.isUnassigned()) {
@@ -492,7 +489,7 @@ public class SplitLogManager extends ZooKeeperListener {
}
task.heartbeat(EnvironmentEdgeManager.currentTimeMillis(),
new_version, workerName);
- tot_mgr_heartbeat.incrementAndGet();
+ SplitLogCounters.tot_mgr_heartbeat.incrementAndGet();
} else {
// duplicate heartbeats - heartbeats w/o zk node version
// changing - are possible. The timeout thread does
@@ -502,8 +499,7 @@ public class SplitLogManager extends ZooKeeperListener {
return;
}
- private boolean resubmit(String path, Task task,
- ResubmitDirective directive) {
+ private boolean resubmit(String path, Task task, ResubmitDirective directive) {
// its ok if this thread misses the update to task.deleted. It will
// fail later
if (task.status != IN_PROGRESS) {
@@ -518,7 +514,7 @@ public class SplitLogManager extends ZooKeeperListener {
if (task.unforcedResubmits >= resubmit_threshold) {
if (!task.resubmitThresholdReached) {
task.resubmitThresholdReached = true;
- tot_mgr_resubmit_threshold_reached.incrementAndGet();
+ SplitLogCounters.tot_mgr_resubmit_threshold_reached.incrementAndGet();
LOG.info("Skipping resubmissions of task " + path +
" because threshold " + resubmit_threshold + " reached");
}
@@ -533,9 +529,8 @@ public class SplitLogManager extends ZooKeeperListener {
task.incarnation++;
try {
// blocking zk call but this is done from the timeout thread
- if (ZKUtil.setData(this.watcher, path,
- TaskState.TASK_UNASSIGNED.get(serverName),
- version) == false) {
+ SplitLogTask slt = new SplitLogTask.Unassigned(this.serverName);
+ if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), version) == false) {
LOG.debug("failed to resubmit task " + path +
" version changed");
task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
@@ -544,15 +539,20 @@ public class SplitLogManager extends ZooKeeperListener {
} catch (NoNodeException e) {
LOG.warn("failed to resubmit because znode doesn't exist " + path +
" task done (or forced done by removing the znode)");
- getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
+ try {
+ getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
+ } catch (DeserializationException e1) {
+ LOG.debug("failed to re-resubmit task " + path + " because of deserialization issue");
+ task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
+ return false;
+ }
return false;
} catch (KeeperException.BadVersionException e) {
- LOG.debug("failed to resubmit task " + path +
- " version changed");
+ LOG.debug("failed to resubmit task " + path + " version changed");
task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
return false;
} catch (KeeperException e) {
- tot_mgr_resubmit_failed.incrementAndGet();
+ SplitLogCounters.tot_mgr_resubmit_failed.incrementAndGet();
LOG.warn("failed to resubmit " + path, e);
return false;
}
@@ -562,7 +562,7 @@ public class SplitLogManager extends ZooKeeperListener {
}
task.setUnassigned();
createRescanNode(Long.MAX_VALUE);
- tot_mgr_resubmit.incrementAndGet();
+ SplitLogCounters.tot_mgr_resubmit.incrementAndGet();
return true;
}
@@ -573,7 +573,7 @@ public class SplitLogManager extends ZooKeeperListener {
}
private void deleteNode(String path, Long retries) {
- tot_mgr_node_delete_queued.incrementAndGet();
+ SplitLogCounters.tot_mgr_node_delete_queued.incrementAndGet();
// Once a task znode is ready for delete, that is it is in the TASK_DONE
// state, then no one should be writing to it anymore. That is no one
// will be updating the znode version any more.
@@ -590,9 +590,9 @@ public class SplitLogManager extends ZooKeeperListener {
task = tasks.remove(path);
if (task == null) {
if (ZKSplitLog.isRescanNode(watcher, path)) {
- tot_mgr_rescan_deleted.incrementAndGet();
+ SplitLogCounters.tot_mgr_rescan_deleted.incrementAndGet();
}
- tot_mgr_missing_state_in_delete.incrementAndGet();
+ SplitLogCounters.tot_mgr_missing_state_in_delete.incrementAndGet();
LOG.debug("deleted task without in memory state " + path);
return;
}
@@ -600,7 +600,7 @@ public class SplitLogManager extends ZooKeeperListener {
task.status = DELETED;
task.notify();
}
- tot_mgr_task_deleted.incrementAndGet();
+ SplitLogCounters.tot_mgr_task_deleted.incrementAndGet();
}
private void deleteNodeFailure(String path) {
@@ -622,16 +622,16 @@ public class SplitLogManager extends ZooKeeperListener {
// might miss the watch-trigger that creation of RESCAN node provides.
// Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks
// therefore this behavior is safe.
+ SplitLogTask slt = new SplitLogTask.Done(this.serverName);
this.watcher.getRecoverableZooKeeper().getZooKeeper().
- create(ZKSplitLog.getRescanNode(watcher),
- TaskState.TASK_DONE.get(serverName), Ids.OPEN_ACL_UNSAFE,
- CreateMode.EPHEMERAL_SEQUENTIAL,
+ create(ZKSplitLog.getRescanNode(watcher), slt.toByteArray(),
+ Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
new CreateRescanAsyncCallback(), Long.valueOf(retries));
}
private void createRescanSuccess(String path) {
lastNodeCreateTime = EnvironmentEdgeManager.currentTimeMillis();
- tot_mgr_rescan.incrementAndGet();
+ SplitLogCounters.tot_mgr_rescan.incrementAndGet();
getDataSetWatch(path, zkretries);
}
@@ -675,7 +675,7 @@ public class SplitLogManager extends ZooKeeperListener {
while (oldtask.status == FAILURE) {
LOG.debug("wait for status of task " + path +
" to change to DELETED");
- tot_mgr_wait_for_zk_delete.incrementAndGet();
+ SplitLogCounters.tot_mgr_wait_for_zk_delete.incrementAndGet();
try {
oldtask.wait();
} catch (InterruptedException e) {
@@ -713,7 +713,7 @@ public class SplitLogManager extends ZooKeeperListener {
task = tasks.putIfAbsent(path, orphanTask);
if (task == null) {
LOG.info("creating orphan task " + path);
- tot_mgr_orphan_task_acquired.incrementAndGet();
+ SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet();
task = orphanTask;
}
return task;
@@ -905,7 +905,7 @@ public class SplitLogManager extends ZooKeeperListener {
}
found_assigned_task = true;
if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) {
- tot_mgr_resubmit_dead_server_task.incrementAndGet();
+ SplitLogCounters.tot_mgr_resubmit_dead_server_task.incrementAndGet();
if (resubmit(path, task, FORCE)) {
resubmitted++;
} else {
@@ -948,7 +948,7 @@ public class SplitLogManager extends ZooKeeperListener {
}
}
createRescanNode(Long.MAX_VALUE);
- tot_mgr_resubmit_unassigned.incrementAndGet();
+ SplitLogCounters.tot_mgr_resubmit_unassigned.incrementAndGet();
LOG.debug("resubmitting unassigned task(s) after timeout");
}
}
@@ -963,7 +963,7 @@ public class SplitLogManager extends ZooKeeperListener {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
- tot_mgr_node_create_result.incrementAndGet();
+ SplitLogCounters.tot_mgr_node_create_result.incrementAndGet();
if (rc != 0) {
if (rc == KeeperException.Code.NODEEXISTS.intValue()) {
// What if there is a delete pending against this pre-existing
@@ -973,16 +973,16 @@ public class SplitLogManager extends ZooKeeperListener {
// And all code pieces correctly handle the case of suddenly
// disappearing task-znode.
LOG.debug("found pre-existing znode " + path);
- tot_mgr_node_already_exists.incrementAndGet();
+ SplitLogCounters.tot_mgr_node_already_exists.incrementAndGet();
} else {
Long retry_count = (Long)ctx;
LOG.warn("create rc =" + KeeperException.Code.get(rc) + " for " +
path + " remaining retries=" + retry_count);
if (retry_count == 0) {
- tot_mgr_node_create_err.incrementAndGet();
+ SplitLogCounters.tot_mgr_node_create_err.incrementAndGet();
createNodeFailure(path);
} else {
- tot_mgr_node_create_retry.incrementAndGet();
+ SplitLogCounters.tot_mgr_node_create_retry.incrementAndGet();
createNode(path, retry_count - 1);
}
return;
@@ -1002,19 +1002,23 @@ public class SplitLogManager extends ZooKeeperListener {
@Override
public void processResult(int rc, String path, Object ctx, byte[] data,
Stat stat) {
- tot_mgr_get_data_result.incrementAndGet();
+ SplitLogCounters.tot_mgr_get_data_result.incrementAndGet();
if (rc != 0) {
if (rc == KeeperException.Code.SESSIONEXPIRED.intValue()) {
LOG.error("ZK session expired. Master is expected to shut down. Abandoning retries.");
return;
}
if (rc == KeeperException.Code.NONODE.intValue()) {
- tot_mgr_get_data_nonode.incrementAndGet();
+ SplitLogCounters.tot_mgr_get_data_nonode.incrementAndGet();
// The task znode has been deleted. Must be some pending delete
// that deleted the task. Assume success because a task-znode is
// is only deleted after TaskFinisher is successful.
LOG.warn("task znode " + path + " vanished.");
- getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
+ try {
+ getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
+ } catch (DeserializationException e) {
+ LOG.warn("Deserialization problem", e);
+ }
return;
}
Long retry_count = (Long) ctx;
@@ -1027,15 +1031,19 @@ public class SplitLogManager extends ZooKeeperListener {
LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " +
path + " remaining retries=" + retry_count);
if (retry_count == 0) {
- tot_mgr_get_data_err.incrementAndGet();
+ SplitLogCounters.tot_mgr_get_data_err.incrementAndGet();
getDataSetWatchFailure(path);
} else {
- tot_mgr_get_data_retry.incrementAndGet();
+ SplitLogCounters.tot_mgr_get_data_retry.incrementAndGet();
getDataSetWatch(path, retry_count - 1);
}
return;
}
- getDataSetWatchSuccess(path, data, stat.getVersion());
+ try {
+ getDataSetWatchSuccess(path, data, stat.getVersion());
+ } catch (DeserializationException e) {
+ LOG.warn("Deserialization problem", e);
+ }
return;
}
}
@@ -1049,10 +1057,10 @@ public class SplitLogManager extends ZooKeeperListener {
@Override
public void processResult(int rc, String path, Object ctx) {
- tot_mgr_node_delete_result.incrementAndGet();
+ SplitLogCounters.tot_mgr_node_delete_result.incrementAndGet();
if (rc != 0) {
if (rc != KeeperException.Code.NONODE.intValue()) {
- tot_mgr_node_delete_err.incrementAndGet();
+ SplitLogCounters.tot_mgr_node_delete_err.incrementAndGet();
Long retry_count = (Long) ctx;
LOG.warn("delete rc=" + KeeperException.Code.get(rc) + " for " +
path + " remaining retries=" + retry_count);
@@ -1139,10 +1147,12 @@ public class SplitLogManager extends ZooKeeperListener {
*/
public Status finish(String workerName, String taskname);
}
+
enum ResubmitDirective {
CHECK(),
FORCE();
}
+
enum TerminationStatus {
IN_PROGRESS("in_progress"),
SUCCESS("success"),
diff --git src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index 994cb76..226f4aa 100644
--- src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -33,6 +33,7 @@ import java.util.Map;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DeserializationException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
@@ -136,6 +137,7 @@ public final class ProtobufUtil {
* for preamble.
*/
static final byte [] PB_MAGIC = new byte [] {'P', 'B', 'U', 'F'};
+ private static final String PB_MAGIC_STR = Bytes.toString(PB_MAGIC);
/**
* Prepend the passed bytes with four bytes of magic, {@link #PB_MAGIC}, to flag what
@@ -158,6 +160,16 @@ public final class ProtobufUtil {
}
/**
+ * @param bytes
+ * @throws DeserializationException if we are missing the pb magic prefix
+ */
+ public static void expectPBMagicPrefix(final byte [] bytes) throws DeserializationException {
+ if (!isPBMagicPrefix(bytes)) {
+ throw new DeserializationException("Missing pb magic " + PB_MAGIC_STR + " prefix");
+ }
+ }
+
+ /**
* @return Length of {@link #PB_MAGIC}
*/
public static int lengthOfPBMagic() {
@@ -231,6 +243,7 @@ public final class ProtobufUtil {
*
* @param serverName the ServerName to convert
* @return the converted protocol buffer ServerName
+ * @see #toServerName(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName)
*/
public static HBaseProtos.ServerName
toServerName(final ServerName serverName) {
@@ -253,8 +266,7 @@ public final class ProtobufUtil {
* @param proto the protocol buffer ServerName to convert
* @return the converted ServerName
*/
- public static ServerName toServerName(
- final HBaseProtos.ServerName proto) {
+ public static ServerName toServerName(final HBaseProtos.ServerName proto) {
if (proto == null) return null;
String hostName = proto.getHostName();
long startCode = -1;
@@ -274,8 +286,7 @@ public final class ProtobufUtil {
* @param proto the RegionInfo to convert
* @return the converted HRegionInfo
*/
- public static HRegionInfo
- toRegionInfo(final RegionInfo proto) {
+ public static HRegionInfo toRegionInfo(final RegionInfo proto) {
if (proto == null) return null;
byte[] tableName = proto.getTableName().toByteArray();
if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
@@ -310,8 +321,7 @@ public final class ProtobufUtil {
* @param info the HRegionInfo to convert
* @return the converted RegionInfo
*/
- public static RegionInfo
- toRegionInfo(final HRegionInfo info) {
+ public static RegionInfo toRegionInfo(final HRegionInfo info) {
if (info == null) return null;
RegionInfo.Builder builder = RegionInfo.newBuilder();
builder.setTableName(ByteString.copyFrom(info.getTableName()));
diff --git src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
index 8457bdc..6be9b7c 100644
--- src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
+++ src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
@@ -1786,6 +1786,1403 @@ public final class ZooKeeperProtos {
// @@protoc_insertion_point(class_scope:ClusterUp)
}
+ public interface RegionTransitionOrBuilder
+ extends com.google.protobuf.MessageOrBuilder {
+
+ // required uint32 eventTypeCode = 1;
+ boolean hasEventTypeCode();
+ int getEventTypeCode();
+
+ // required bytes regionName = 2;
+ boolean hasRegionName();
+ com.google.protobuf.ByteString getRegionName();
+
+ // required uint64 createTime = 3;
+ boolean hasCreateTime();
+ long getCreateTime();
+
+ // optional .ServerName originServerName = 4;
+ boolean hasOriginServerName();
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName getOriginServerName();
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getOriginServerNameOrBuilder();
+
+ // optional bytes payload = 5;
+ boolean hasPayload();
+ com.google.protobuf.ByteString getPayload();
+ }
+ public static final class RegionTransition extends
+ com.google.protobuf.GeneratedMessage
+ implements RegionTransitionOrBuilder {
+ // Use RegionTransition.newBuilder() to construct.
+ private RegionTransition(Builder builder) {
+ super(builder);
+ }
+ private RegionTransition(boolean noInit) {}
+
+ private static final RegionTransition defaultInstance;
+ public static RegionTransition getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public RegionTransition getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_RegionTransition_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_RegionTransition_fieldAccessorTable;
+ }
+
+ private int bitField0_;
+ // required uint32 eventTypeCode = 1;
+ public static final int EVENTTYPECODE_FIELD_NUMBER = 1;
+ private int eventTypeCode_;
+ public boolean hasEventTypeCode() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ public int getEventTypeCode() {
+ return eventTypeCode_;
+ }
+
+ // required bytes regionName = 2;
+ public static final int REGIONNAME_FIELD_NUMBER = 2;
+ private com.google.protobuf.ByteString regionName_;
+ public boolean hasRegionName() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ public com.google.protobuf.ByteString getRegionName() {
+ return regionName_;
+ }
+
+ // required uint64 createTime = 3;
+ public static final int CREATETIME_FIELD_NUMBER = 3;
+ private long createTime_;
+ public boolean hasCreateTime() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ public long getCreateTime() {
+ return createTime_;
+ }
+
+ // optional .ServerName originServerName = 4;
+ public static final int ORIGINSERVERNAME_FIELD_NUMBER = 4;
+ private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName originServerName_;
+ public boolean hasOriginServerName() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName getOriginServerName() {
+ return originServerName_;
+ }
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getOriginServerNameOrBuilder() {
+ return originServerName_;
+ }
+
+ // optional bytes payload = 5;
+ public static final int PAYLOAD_FIELD_NUMBER = 5;
+ private com.google.protobuf.ByteString payload_;
+ public boolean hasPayload() {
+ return ((bitField0_ & 0x00000010) == 0x00000010);
+ }
+ public com.google.protobuf.ByteString getPayload() {
+ return payload_;
+ }
+
+ private void initFields() {
+ eventTypeCode_ = 0;
+ regionName_ = com.google.protobuf.ByteString.EMPTY;
+ createTime_ = 0L;
+ originServerName_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
+ payload_ = com.google.protobuf.ByteString.EMPTY;
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ if (!hasEventTypeCode()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!hasRegionName()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!hasCreateTime()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (hasOriginServerName()) {
+ if (!getOriginServerName().isInitialized()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ }
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(com.google.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeUInt32(1, eventTypeCode_);
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ output.writeBytes(2, regionName_);
+ }
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ output.writeUInt64(3, createTime_);
+ }
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ output.writeMessage(4, originServerName_);
+ }
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ output.writeBytes(5, payload_);
+ }
+ getUnknownFields().writeTo(output);
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeUInt32Size(1, eventTypeCode_);
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(2, regionName_);
+ }
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeUInt64Size(3, createTime_);
+ }
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeMessageSize(4, originServerName_);
+ }
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(5, payload_);
+ }
+ size += getUnknownFields().getSerializedSize();
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ @java.lang.Override
+ public boolean equals(final java.lang.Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition)) {
+ return super.equals(obj);
+ }
+ org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition other = (org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition) obj;
+
+ boolean result = true;
+ result = result && (hasEventTypeCode() == other.hasEventTypeCode());
+ if (hasEventTypeCode()) {
+ result = result && (getEventTypeCode()
+ == other.getEventTypeCode());
+ }
+ result = result && (hasRegionName() == other.hasRegionName());
+ if (hasRegionName()) {
+ result = result && getRegionName()
+ .equals(other.getRegionName());
+ }
+ result = result && (hasCreateTime() == other.hasCreateTime());
+ if (hasCreateTime()) {
+ result = result && (getCreateTime()
+ == other.getCreateTime());
+ }
+ result = result && (hasOriginServerName() == other.hasOriginServerName());
+ if (hasOriginServerName()) {
+ result = result && getOriginServerName()
+ .equals(other.getOriginServerName());
+ }
+ result = result && (hasPayload() == other.hasPayload());
+ if (hasPayload()) {
+ result = result && getPayload()
+ .equals(other.getPayload());
+ }
+ result = result &&
+ getUnknownFields().equals(other.getUnknownFields());
+ return result;
+ }
+
+ @java.lang.Override
+ public int hashCode() {
+ int hash = 41;
+ hash = (19 * hash) + getDescriptorForType().hashCode();
+ if (hasEventTypeCode()) {
+ hash = (37 * hash) + EVENTTYPECODE_FIELD_NUMBER;
+ hash = (53 * hash) + getEventTypeCode();
+ }
+ if (hasRegionName()) {
+ hash = (37 * hash) + REGIONNAME_FIELD_NUMBER;
+ hash = (53 * hash) + getRegionName().hashCode();
+ }
+ if (hasCreateTime()) {
+ hash = (37 * hash) + CREATETIME_FIELD_NUMBER;
+ hash = (53 * hash) + hashLong(getCreateTime());
+ }
+ if (hasOriginServerName()) {
+ hash = (37 * hash) + ORIGINSERVERNAME_FIELD_NUMBER;
+ hash = (53 * hash) + getOriginServerName().hashCode();
+ }
+ if (hasPayload()) {
+ hash = (37 * hash) + PAYLOAD_FIELD_NUMBER;
+ hash = (53 * hash) + getPayload().hashCode();
+ }
+ hash = (29 * hash) + getUnknownFields().hashCode();
+ return hash;
+ }
+
+ public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition parseFrom(
+ com.google.protobuf.ByteString data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data).buildParsed();
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition parseFrom(
+ com.google.protobuf.ByteString data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition parseFrom(byte[] data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data).buildParsed();
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition parseFrom(
+ byte[] data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input).buildParsed();
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition parseFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ Builder builder = newBuilder();
+ if (builder.mergeDelimitedFrom(input)) {
+ return builder.buildParsed();
+ } else {
+ return null;
+ }
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition parseDelimitedFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ Builder builder = newBuilder();
+ if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+ return builder.buildParsed();
+ } else {
+ return null;
+ }
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition parseFrom(
+ com.google.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input).buildParsed();
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition parseFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input, extensionRegistry)
+ .buildParsed();
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ @java.lang.Override
+ protected Builder newBuilderForType(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ public static final class Builder extends
+ com.google.protobuf.GeneratedMessage.Builder
+ implements org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransitionOrBuilder {
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_RegionTransition_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_RegionTransition_fieldAccessorTable;
+ }
+
+ // Construct using org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private Builder(BuilderParent parent) {
+ super(parent);
+ maybeForceBuilderInitialization();
+ }
+ private void maybeForceBuilderInitialization() {
+ if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+ getOriginServerNameFieldBuilder();
+ }
+ }
+ private static Builder create() {
+ return new Builder();
+ }
+
+ public Builder clear() {
+ super.clear();
+ eventTypeCode_ = 0;
+ bitField0_ = (bitField0_ & ~0x00000001);
+ regionName_ = com.google.protobuf.ByteString.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000002);
+ createTime_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000004);
+ if (originServerNameBuilder_ == null) {
+ originServerName_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
+ } else {
+ originServerNameBuilder_.clear();
+ }
+ bitField0_ = (bitField0_ & ~0x00000008);
+ payload_ = com.google.protobuf.ByteString.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000010);
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public com.google.protobuf.Descriptors.Descriptor
+ getDescriptorForType() {
+ return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition.getDescriptor();
+ }
+
+ public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition getDefaultInstanceForType() {
+ return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition.getDefaultInstance();
+ }
+
+ public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition build() {
+ org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ private org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition buildParsed()
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(
+ result).asInvalidProtocolBufferException();
+ }
+ return result;
+ }
+
+ public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition buildPartial() {
+ org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition result = new org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition(this);
+ int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
+ if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ result.eventTypeCode_ = eventTypeCode_;
+ if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+ to_bitField0_ |= 0x00000002;
+ }
+ result.regionName_ = regionName_;
+ if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+ to_bitField0_ |= 0x00000004;
+ }
+ result.createTime_ = createTime_;
+ if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+ to_bitField0_ |= 0x00000008;
+ }
+ if (originServerNameBuilder_ == null) {
+ result.originServerName_ = originServerName_;
+ } else {
+ result.originServerName_ = originServerNameBuilder_.build();
+ }
+ if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+ to_bitField0_ |= 0x00000010;
+ }
+ result.payload_ = payload_;
+ result.bitField0_ = to_bitField0_;
+ onBuilt();
+ return result;
+ }
+
+ public Builder mergeFrom(com.google.protobuf.Message other) {
+ if (other instanceof org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition) {
+ return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition other) {
+ if (other == org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition.getDefaultInstance()) return this;
+ if (other.hasEventTypeCode()) {
+ setEventTypeCode(other.getEventTypeCode());
+ }
+ if (other.hasRegionName()) {
+ setRegionName(other.getRegionName());
+ }
+ if (other.hasCreateTime()) {
+ setCreateTime(other.getCreateTime());
+ }
+ if (other.hasOriginServerName()) {
+ mergeOriginServerName(other.getOriginServerName());
+ }
+ if (other.hasPayload()) {
+ setPayload(other.getPayload());
+ }
+ this.mergeUnknownFields(other.getUnknownFields());
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ if (!hasEventTypeCode()) {
+
+ return false;
+ }
+ if (!hasRegionName()) {
+
+ return false;
+ }
+ if (!hasCreateTime()) {
+
+ return false;
+ }
+ if (hasOriginServerName()) {
+ if (!getOriginServerName().isInitialized()) {
+
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public Builder mergeFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+ com.google.protobuf.UnknownFieldSet.newBuilder(
+ this.getUnknownFields());
+ while (true) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ this.setUnknownFields(unknownFields.build());
+ onChanged();
+ return this;
+ default: {
+ if (!parseUnknownField(input, unknownFields,
+ extensionRegistry, tag)) {
+ this.setUnknownFields(unknownFields.build());
+ onChanged();
+ return this;
+ }
+ break;
+ }
+ case 8: {
+ bitField0_ |= 0x00000001;
+ eventTypeCode_ = input.readUInt32();
+ break;
+ }
+ case 18: {
+ bitField0_ |= 0x00000002;
+ regionName_ = input.readBytes();
+ break;
+ }
+ case 24: {
+ bitField0_ |= 0x00000004;
+ createTime_ = input.readUInt64();
+ break;
+ }
+ case 34: {
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder subBuilder = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.newBuilder();
+ if (hasOriginServerName()) {
+ subBuilder.mergeFrom(getOriginServerName());
+ }
+ input.readMessage(subBuilder, extensionRegistry);
+ setOriginServerName(subBuilder.buildPartial());
+ break;
+ }
+ case 42: {
+ bitField0_ |= 0x00000010;
+ payload_ = input.readBytes();
+ break;
+ }
+ }
+ }
+ }
+
+ private int bitField0_;
+
+ // required uint32 eventTypeCode = 1;
+ private int eventTypeCode_ ;
+ public boolean hasEventTypeCode() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ public int getEventTypeCode() {
+ return eventTypeCode_;
+ }
+ public Builder setEventTypeCode(int value) {
+ bitField0_ |= 0x00000001;
+ eventTypeCode_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearEventTypeCode() {
+ bitField0_ = (bitField0_ & ~0x00000001);
+ eventTypeCode_ = 0;
+ onChanged();
+ return this;
+ }
+
+ // required bytes regionName = 2;
+ private com.google.protobuf.ByteString regionName_ = com.google.protobuf.ByteString.EMPTY;
+ public boolean hasRegionName() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ public com.google.protobuf.ByteString getRegionName() {
+ return regionName_;
+ }
+ public Builder setRegionName(com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000002;
+ regionName_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearRegionName() {
+ bitField0_ = (bitField0_ & ~0x00000002);
+ regionName_ = getDefaultInstance().getRegionName();
+ onChanged();
+ return this;
+ }
+
+ // required uint64 createTime = 3;
+ private long createTime_ ;
+ public boolean hasCreateTime() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ public long getCreateTime() {
+ return createTime_;
+ }
+ public Builder setCreateTime(long value) {
+ bitField0_ |= 0x00000004;
+ createTime_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearCreateTime() {
+ bitField0_ = (bitField0_ & ~0x00000004);
+ createTime_ = 0L;
+ onChanged();
+ return this;
+ }
+
+ // optional .ServerName originServerName = 4;
+ private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName originServerName_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
+ private com.google.protobuf.SingleFieldBuilder<
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder> originServerNameBuilder_;
+ public boolean hasOriginServerName() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName getOriginServerName() {
+ if (originServerNameBuilder_ == null) {
+ return originServerName_;
+ } else {
+ return originServerNameBuilder_.getMessage();
+ }
+ }
+ public Builder setOriginServerName(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName value) {
+ if (originServerNameBuilder_ == null) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ originServerName_ = value;
+ onChanged();
+ } else {
+ originServerNameBuilder_.setMessage(value);
+ }
+ bitField0_ |= 0x00000008;
+ return this;
+ }
+ public Builder setOriginServerName(
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder builderForValue) {
+ if (originServerNameBuilder_ == null) {
+ originServerName_ = builderForValue.build();
+ onChanged();
+ } else {
+ originServerNameBuilder_.setMessage(builderForValue.build());
+ }
+ bitField0_ |= 0x00000008;
+ return this;
+ }
+ public Builder mergeOriginServerName(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName value) {
+ if (originServerNameBuilder_ == null) {
+ if (((bitField0_ & 0x00000008) == 0x00000008) &&
+ originServerName_ != org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance()) {
+ originServerName_ =
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.newBuilder(originServerName_).mergeFrom(value).buildPartial();
+ } else {
+ originServerName_ = value;
+ }
+ onChanged();
+ } else {
+ originServerNameBuilder_.mergeFrom(value);
+ }
+ bitField0_ |= 0x00000008;
+ return this;
+ }
+ public Builder clearOriginServerName() {
+ if (originServerNameBuilder_ == null) {
+ originServerName_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
+ onChanged();
+ } else {
+ originServerNameBuilder_.clear();
+ }
+ bitField0_ = (bitField0_ & ~0x00000008);
+ return this;
+ }
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder getOriginServerNameBuilder() {
+ bitField0_ |= 0x00000008;
+ onChanged();
+ return getOriginServerNameFieldBuilder().getBuilder();
+ }
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getOriginServerNameOrBuilder() {
+ if (originServerNameBuilder_ != null) {
+ return originServerNameBuilder_.getMessageOrBuilder();
+ } else {
+ return originServerName_;
+ }
+ }
+ private com.google.protobuf.SingleFieldBuilder<
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder>
+ getOriginServerNameFieldBuilder() {
+ if (originServerNameBuilder_ == null) {
+ originServerNameBuilder_ = new com.google.protobuf.SingleFieldBuilder<
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder>(
+ originServerName_,
+ getParentForChildren(),
+ isClean());
+ originServerName_ = null;
+ }
+ return originServerNameBuilder_;
+ }
+
+ // optional bytes payload = 5;
+ private com.google.protobuf.ByteString payload_ = com.google.protobuf.ByteString.EMPTY;
+ public boolean hasPayload() {
+ return ((bitField0_ & 0x00000010) == 0x00000010);
+ }
+ public com.google.protobuf.ByteString getPayload() {
+ return payload_;
+ }
+ public Builder setPayload(com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000010;
+ payload_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearPayload() {
+ bitField0_ = (bitField0_ & ~0x00000010);
+ payload_ = getDefaultInstance().getPayload();
+ onChanged();
+ return this;
+ }
+
+ // @@protoc_insertion_point(builder_scope:RegionTransition)
+ }
+
+ static {
+ defaultInstance = new RegionTransition(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:RegionTransition)
+ }
+
+ public interface SplitLogTaskOrBuilder
+ extends com.google.protobuf.MessageOrBuilder {
+
+ // required .SplitLogTask.State state = 1;
+ boolean hasState();
+ org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.State getState();
+
+ // required .ServerName serverName = 2;
+ boolean hasServerName();
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName getServerName();
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getServerNameOrBuilder();
+ }
+ public static final class SplitLogTask extends
+ com.google.protobuf.GeneratedMessage
+ implements SplitLogTaskOrBuilder {
+ // Use SplitLogTask.newBuilder() to construct.
+ private SplitLogTask(Builder builder) {
+ super(builder);
+ }
+ private SplitLogTask(boolean noInit) {}
+
+ private static final SplitLogTask defaultInstance;
+ public static SplitLogTask getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public SplitLogTask getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_SplitLogTask_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_SplitLogTask_fieldAccessorTable;
+ }
+
+ public enum State
+ implements com.google.protobuf.ProtocolMessageEnum {
+ UNASSIGNED(0, 0),
+ OWNED(1, 1),
+ RESIGNED(2, 2),
+ DONE(3, 3),
+ ERR(4, 4),
+ ;
+
+ public static final int UNASSIGNED_VALUE = 0;
+ public static final int OWNED_VALUE = 1;
+ public static final int RESIGNED_VALUE = 2;
+ public static final int DONE_VALUE = 3;
+ public static final int ERR_VALUE = 4;
+
+
+ public final int getNumber() { return value; }
+
+ public static State valueOf(int value) {
+ switch (value) {
+ case 0: return UNASSIGNED;
+ case 1: return OWNED;
+ case 2: return RESIGNED;
+ case 3: return DONE;
+ case 4: return ERR;
+ default: return null;
+ }
+ }
+
+ public static com.google.protobuf.Internal.EnumLiteMap
+ internalGetValueMap() {
+ return internalValueMap;
+ }
+ private static com.google.protobuf.Internal.EnumLiteMap
+ internalValueMap =
+ new com.google.protobuf.Internal.EnumLiteMap() {
+ public State findValueByNumber(int number) {
+ return State.valueOf(number);
+ }
+ };
+
+ public final com.google.protobuf.Descriptors.EnumValueDescriptor
+ getValueDescriptor() {
+ return getDescriptor().getValues().get(index);
+ }
+ public final com.google.protobuf.Descriptors.EnumDescriptor
+ getDescriptorForType() {
+ return getDescriptor();
+ }
+ public static final com.google.protobuf.Descriptors.EnumDescriptor
+ getDescriptor() {
+ return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.getDescriptor().getEnumTypes().get(0);
+ }
+
+ private static final State[] VALUES = {
+ UNASSIGNED, OWNED, RESIGNED, DONE, ERR,
+ };
+
+ public static State valueOf(
+ com.google.protobuf.Descriptors.EnumValueDescriptor desc) {
+ if (desc.getType() != getDescriptor()) {
+ throw new java.lang.IllegalArgumentException(
+ "EnumValueDescriptor is not for this type.");
+ }
+ return VALUES[desc.getIndex()];
+ }
+
+ private final int index;
+ private final int value;
+
+ private State(int index, int value) {
+ this.index = index;
+ this.value = value;
+ }
+
+ // @@protoc_insertion_point(enum_scope:SplitLogTask.State)
+ }
+
+ private int bitField0_;
+ // required .SplitLogTask.State state = 1;
+ public static final int STATE_FIELD_NUMBER = 1;
+ private org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.State state_;
+ public boolean hasState() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.State getState() {
+ return state_;
+ }
+
+ // required .ServerName serverName = 2;
+ public static final int SERVERNAME_FIELD_NUMBER = 2;
+ private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName serverName_;
+ public boolean hasServerName() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName getServerName() {
+ return serverName_;
+ }
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getServerNameOrBuilder() {
+ return serverName_;
+ }
+
+ private void initFields() {
+ state_ = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.State.UNASSIGNED;
+ serverName_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ if (!hasState()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!hasServerName()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!getServerName().isInitialized()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(com.google.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeEnum(1, state_.getNumber());
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ output.writeMessage(2, serverName_);
+ }
+ getUnknownFields().writeTo(output);
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeEnumSize(1, state_.getNumber());
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeMessageSize(2, serverName_);
+ }
+ size += getUnknownFields().getSerializedSize();
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ @java.lang.Override
+ public boolean equals(final java.lang.Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask)) {
+ return super.equals(obj);
+ }
+ org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask other = (org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask) obj;
+
+ boolean result = true;
+ result = result && (hasState() == other.hasState());
+ if (hasState()) {
+ result = result &&
+ (getState() == other.getState());
+ }
+ result = result && (hasServerName() == other.hasServerName());
+ if (hasServerName()) {
+ result = result && getServerName()
+ .equals(other.getServerName());
+ }
+ result = result &&
+ getUnknownFields().equals(other.getUnknownFields());
+ return result;
+ }
+
+ @java.lang.Override
+ public int hashCode() {
+ int hash = 41;
+ hash = (19 * hash) + getDescriptorForType().hashCode();
+ if (hasState()) {
+ hash = (37 * hash) + STATE_FIELD_NUMBER;
+ hash = (53 * hash) + hashEnum(getState());
+ }
+ if (hasServerName()) {
+ hash = (37 * hash) + SERVERNAME_FIELD_NUMBER;
+ hash = (53 * hash) + getServerName().hashCode();
+ }
+ hash = (29 * hash) + getUnknownFields().hashCode();
+ return hash;
+ }
+
+ public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask parseFrom(
+ com.google.protobuf.ByteString data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data).buildParsed();
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask parseFrom(
+ com.google.protobuf.ByteString data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask parseFrom(byte[] data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data).buildParsed();
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask parseFrom(
+ byte[] data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input).buildParsed();
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask parseFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ Builder builder = newBuilder();
+ if (builder.mergeDelimitedFrom(input)) {
+ return builder.buildParsed();
+ } else {
+ return null;
+ }
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask parseDelimitedFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ Builder builder = newBuilder();
+ if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+ return builder.buildParsed();
+ } else {
+ return null;
+ }
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask parseFrom(
+ com.google.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input).buildParsed();
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask parseFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input, extensionRegistry)
+ .buildParsed();
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ @java.lang.Override
+ protected Builder newBuilderForType(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ public static final class Builder extends
+ com.google.protobuf.GeneratedMessage.Builder
+ implements org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTaskOrBuilder {
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_SplitLogTask_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_SplitLogTask_fieldAccessorTable;
+ }
+
+ // Construct using org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private Builder(BuilderParent parent) {
+ super(parent);
+ maybeForceBuilderInitialization();
+ }
+ private void maybeForceBuilderInitialization() {
+ if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+ getServerNameFieldBuilder();
+ }
+ }
+ private static Builder create() {
+ return new Builder();
+ }
+
+ public Builder clear() {
+ super.clear();
+ state_ = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.State.UNASSIGNED;
+ bitField0_ = (bitField0_ & ~0x00000001);
+ if (serverNameBuilder_ == null) {
+ serverName_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
+ } else {
+ serverNameBuilder_.clear();
+ }
+ bitField0_ = (bitField0_ & ~0x00000002);
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public com.google.protobuf.Descriptors.Descriptor
+ getDescriptorForType() {
+ return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.getDescriptor();
+ }
+
+ public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask getDefaultInstanceForType() {
+ return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.getDefaultInstance();
+ }
+
+ public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask build() {
+ org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ private org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask buildParsed()
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(
+ result).asInvalidProtocolBufferException();
+ }
+ return result;
+ }
+
+ public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask buildPartial() {
+ org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask result = new org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask(this);
+ int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
+ if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ result.state_ = state_;
+ if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+ to_bitField0_ |= 0x00000002;
+ }
+ if (serverNameBuilder_ == null) {
+ result.serverName_ = serverName_;
+ } else {
+ result.serverName_ = serverNameBuilder_.build();
+ }
+ result.bitField0_ = to_bitField0_;
+ onBuilt();
+ return result;
+ }
+
+ public Builder mergeFrom(com.google.protobuf.Message other) {
+ if (other instanceof org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask) {
+ return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask other) {
+ if (other == org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.getDefaultInstance()) return this;
+ if (other.hasState()) {
+ setState(other.getState());
+ }
+ if (other.hasServerName()) {
+ mergeServerName(other.getServerName());
+ }
+ this.mergeUnknownFields(other.getUnknownFields());
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ if (!hasState()) {
+
+ return false;
+ }
+ if (!hasServerName()) {
+
+ return false;
+ }
+ if (!getServerName().isInitialized()) {
+
+ return false;
+ }
+ return true;
+ }
+
+ public Builder mergeFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+ com.google.protobuf.UnknownFieldSet.newBuilder(
+ this.getUnknownFields());
+ while (true) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ this.setUnknownFields(unknownFields.build());
+ onChanged();
+ return this;
+ default: {
+ if (!parseUnknownField(input, unknownFields,
+ extensionRegistry, tag)) {
+ this.setUnknownFields(unknownFields.build());
+ onChanged();
+ return this;
+ }
+ break;
+ }
+ case 8: {
+ int rawValue = input.readEnum();
+ org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.State value = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.State.valueOf(rawValue);
+ if (value == null) {
+ unknownFields.mergeVarintField(1, rawValue);
+ } else {
+ bitField0_ |= 0x00000001;
+ state_ = value;
+ }
+ break;
+ }
+ case 18: {
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder subBuilder = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.newBuilder();
+ if (hasServerName()) {
+ subBuilder.mergeFrom(getServerName());
+ }
+ input.readMessage(subBuilder, extensionRegistry);
+ setServerName(subBuilder.buildPartial());
+ break;
+ }
+ }
+ }
+ }
+
+ private int bitField0_;
+
+ // required .SplitLogTask.State state = 1;
+ private org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.State state_ = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.State.UNASSIGNED;
+ public boolean hasState() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.State getState() {
+ return state_;
+ }
+ public Builder setState(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.State value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000001;
+ state_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearState() {
+ bitField0_ = (bitField0_ & ~0x00000001);
+ state_ = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.State.UNASSIGNED;
+ onChanged();
+ return this;
+ }
+
+ // required .ServerName serverName = 2;
+ private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName serverName_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
+ private com.google.protobuf.SingleFieldBuilder<
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder> serverNameBuilder_;
+ public boolean hasServerName() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName getServerName() {
+ if (serverNameBuilder_ == null) {
+ return serverName_;
+ } else {
+ return serverNameBuilder_.getMessage();
+ }
+ }
+ public Builder setServerName(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName value) {
+ if (serverNameBuilder_ == null) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ serverName_ = value;
+ onChanged();
+ } else {
+ serverNameBuilder_.setMessage(value);
+ }
+ bitField0_ |= 0x00000002;
+ return this;
+ }
+ public Builder setServerName(
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder builderForValue) {
+ if (serverNameBuilder_ == null) {
+ serverName_ = builderForValue.build();
+ onChanged();
+ } else {
+ serverNameBuilder_.setMessage(builderForValue.build());
+ }
+ bitField0_ |= 0x00000002;
+ return this;
+ }
+ public Builder mergeServerName(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName value) {
+ if (serverNameBuilder_ == null) {
+ if (((bitField0_ & 0x00000002) == 0x00000002) &&
+ serverName_ != org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance()) {
+ serverName_ =
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.newBuilder(serverName_).mergeFrom(value).buildPartial();
+ } else {
+ serverName_ = value;
+ }
+ onChanged();
+ } else {
+ serverNameBuilder_.mergeFrom(value);
+ }
+ bitField0_ |= 0x00000002;
+ return this;
+ }
+ public Builder clearServerName() {
+ if (serverNameBuilder_ == null) {
+ serverName_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
+ onChanged();
+ } else {
+ serverNameBuilder_.clear();
+ }
+ bitField0_ = (bitField0_ & ~0x00000002);
+ return this;
+ }
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder getServerNameBuilder() {
+ bitField0_ |= 0x00000002;
+ onChanged();
+ return getServerNameFieldBuilder().getBuilder();
+ }
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getServerNameOrBuilder() {
+ if (serverNameBuilder_ != null) {
+ return serverNameBuilder_.getMessageOrBuilder();
+ } else {
+ return serverName_;
+ }
+ }
+ private com.google.protobuf.SingleFieldBuilder<
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder>
+ getServerNameFieldBuilder() {
+ if (serverNameBuilder_ == null) {
+ serverNameBuilder_ = new com.google.protobuf.SingleFieldBuilder<
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder>(
+ serverName_,
+ getParentForChildren(),
+ isClean());
+ serverName_ = null;
+ }
+ return serverNameBuilder_;
+ }
+
+ // @@protoc_insertion_point(builder_scope:SplitLogTask)
+ }
+
+ static {
+ defaultInstance = new SplitLogTask(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:SplitLogTask)
+ }
+
private static com.google.protobuf.Descriptors.Descriptor
internal_static_RootRegionServer_descriptor;
private static
@@ -1806,6 +3203,16 @@ public final class ZooKeeperProtos {
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_ClusterUp_fieldAccessorTable;
+ private static com.google.protobuf.Descriptors.Descriptor
+ internal_static_RegionTransition_descriptor;
+ private static
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internal_static_RegionTransition_fieldAccessorTable;
+ private static com.google.protobuf.Descriptors.Descriptor
+ internal_static_SplitLogTask_descriptor;
+ private static
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internal_static_SplitLogTask_fieldAccessorTable;
public static com.google.protobuf.Descriptors.FileDescriptor
getDescriptor() {
@@ -1819,9 +3226,16 @@ public final class ZooKeeperProtos {
"gionServer\022\033\n\006server\030\001 \002(\0132\013.ServerName\"" +
"%\n\006Master\022\033\n\006master\030\001 \002(\0132\013.ServerName\"\036" +
"\n\tClusterId\022\021\n\tclusterId\030\001 \002(\t\"\036\n\tCluste" +
- "rUp\022\021\n\tstartDate\030\001 \002(\tBE\n*org.apache.had" +
- "oop.hbase.protobuf.generatedB\017ZooKeeperP" +
- "rotosH\001\210\001\001\240\001\001"
+ "rUp\022\021\n\tstartDate\030\001 \002(\t\"\211\001\n\020RegionTransit" +
+ "ion\022\025\n\reventTypeCode\030\001 \002(\r\022\022\n\nregionName" +
+ "\030\002 \002(\014\022\022\n\ncreateTime\030\003 \002(\004\022%\n\020originServ" +
+ "erName\030\004 \001(\0132\013.ServerName\022\017\n\007payload\030\005 \001" +
+ "(\014\"\230\001\n\014SplitLogTask\022\"\n\005state\030\001 \002(\0162\023.Spl" +
+ "itLogTask.State\022\037\n\nserverName\030\002 \002(\0132\013.Se",
+ "rverName\"C\n\005State\022\016\n\nUNASSIGNED\020\000\022\t\n\005OWN" +
+ "ED\020\001\022\014\n\010RESIGNED\020\002\022\010\n\004DONE\020\003\022\007\n\003ERR\020\004BE\n" +
+ "*org.apache.hadoop.hbase.protobuf.genera" +
+ "tedB\017ZooKeeperProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -1860,6 +3274,22 @@ public final class ZooKeeperProtos {
new java.lang.String[] { "StartDate", },
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ClusterUp.class,
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ClusterUp.Builder.class);
+ internal_static_RegionTransition_descriptor =
+ getDescriptor().getMessageTypes().get(4);
+ internal_static_RegionTransition_fieldAccessorTable = new
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+ internal_static_RegionTransition_descriptor,
+ new java.lang.String[] { "EventTypeCode", "RegionName", "CreateTime", "OriginServerName", "Payload", },
+ org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition.class,
+ org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition.Builder.class);
+ internal_static_SplitLogTask_descriptor =
+ getDescriptor().getMessageTypes().get(5);
+ internal_static_SplitLogTask_fieldAccessorTable = new
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+ internal_static_SplitLogTask_descriptor,
+ new java.lang.String[] { "State", "ServerName", },
+ org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.class,
+ org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.Builder.class);
return null;
}
};
diff --git src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
index 8ea342f..bc505e4 100644
--- src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
+++ src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
@@ -19,8 +19,6 @@
*/
package org.apache.hadoop.hbase.regionserver;
-import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*;
-
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.List;
@@ -32,12 +30,14 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.master.SplitLogManager;
+import org.apache.hadoop.hbase.DeserializationException;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.SplitLogCounters;
+import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
-import org.apache.hadoop.hbase.zookeeper.ZKSplitLog.TaskState;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -71,7 +71,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
private static final Log LOG = LogFactory.getLog(SplitLogWorker.class);
Thread worker;
- private final String serverName;
+ private final ServerName serverName;
private final TaskExecutor splitTaskExecutor;
private long zkretries;
@@ -85,7 +85,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf,
- String serverName, TaskExecutor splitTaskExecutor) {
+ ServerName serverName, TaskExecutor splitTaskExecutor) {
super(watcher);
this.serverName = serverName;
this.splitTaskExecutor = splitTaskExecutor;
@@ -93,7 +93,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
}
public SplitLogWorker(ZooKeeperWatcher watcher, final Configuration conf,
- final String serverName) {
+ final ServerName serverName) {
this(watcher, conf, serverName, new TaskExecutor () {
@Override
public Status exec(String filename, CancelableProgressable p) {
@@ -111,7 +111,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
// encountered a bad non-retry-able persistent error.
try {
String tmpname =
- ZKSplitLog.getSplitLogDirTmpComponent(serverName, filename);
+ ZKSplitLog.getSplitLogDirTmpComponent(serverName.toString(), filename);
if (HLogSplitter.splitLogFileToTemp(rootdir, tmpname,
fs.getFileStatus(new Path(filename)), fs, conf, p) == false) {
return Status.PREEMPTED;
@@ -241,31 +241,40 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
try {
try {
if ((data = ZKUtil.getDataNoWatch(this.watcher, path, stat)) == null) {
- tot_wkr_failed_to_grab_task_no_data.incrementAndGet();
+ SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.incrementAndGet();
return;
}
} catch (KeeperException e) {
LOG.warn("Failed to get data for znode " + path, e);
- tot_wkr_failed_to_grab_task_exception.incrementAndGet();
+ SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
+ return;
+ }
+ SplitLogTask slt;
+ try {
+ slt = SplitLogTask.parseFrom(data);
+ } catch (DeserializationException e) {
+ LOG.warn("Failed parse data for znode " + path, e);
+ SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
return;
}
- if (TaskState.TASK_UNASSIGNED.equals(data) == false) {
- tot_wkr_failed_to_grab_task_owned.incrementAndGet();
+ if (slt.isUnassigned() == false) {
+ SplitLogCounters.tot_wkr_failed_to_grab_task_owned.incrementAndGet();
return;
}
currentVersion = stat.getVersion();
if (attemptToOwnTask(true) == false) {
- tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
+ SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
return;
}
if (ZKSplitLog.isRescanNode(watcher, currentTask)) {
- endTask(TaskState.TASK_DONE, tot_wkr_task_acquired_rescan);
+ endTask(new SplitLogTask.Done(this.serverName),
+ SplitLogCounters.tot_wkr_task_acquired_rescan);
return;
}
LOG.info("worker " + serverName + " acquired task " + path);
- tot_wkr_task_acquired.incrementAndGet();
+ SplitLogCounters.tot_wkr_task_acquired.incrementAndGet();
getDataSetWatchAsync();
t = System.currentTimeMillis();
@@ -285,15 +294,17 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
});
switch (status) {
case DONE:
- endTask(TaskState.TASK_DONE, tot_wkr_task_done);
+ endTask(new SplitLogTask.Done(this.serverName),
+ SplitLogCounters.tot_wkr_task_done);
break;
case PREEMPTED:
- tot_wkr_preempt_task.incrementAndGet();
+ SplitLogCounters.tot_wkr_preempt_task.incrementAndGet();
LOG.warn("task execution prempted " + path);
break;
case ERR:
if (!exitWorker) {
- endTask(TaskState.TASK_ERR, tot_wkr_task_err);
+ endTask(new SplitLogTask.Err(this.serverName),
+ SplitLogCounters.tot_wkr_task_err);
break;
}
// if the RS is exiting then there is probably a tons of stuff
@@ -303,9 +314,10 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
if (exitWorker) {
LOG.info("task execution interrupted because worker is exiting " +
path);
- endTask(TaskState.TASK_RESIGNED, tot_wkr_task_resigned);
+ endTask(new SplitLogTask.Resigned(this.serverName),
+ SplitLogCounters.tot_wkr_task_resigned);
} else {
- tot_wkr_preempt_task.incrementAndGet();
+ SplitLogCounters.tot_wkr_preempt_task.incrementAndGet();
LOG.info("task execution interrupted via zk by manager " +
path);
}
@@ -337,15 +349,16 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
*/
private boolean attemptToOwnTask(boolean isFirstTime) {
try {
- Stat stat = this.watcher.getRecoverableZooKeeper().setData(currentTask,
- TaskState.TASK_OWNED.get(serverName), currentVersion);
+ SplitLogTask slt = new SplitLogTask.Owned(this.serverName);
+ Stat stat =
+ this.watcher.getRecoverableZooKeeper().setData(currentTask, slt.toByteArray(), currentVersion);
if (stat == null) {
LOG.warn("zk.setData() returned null for path " + currentTask);
- tot_wkr_task_heartbeat_failed.incrementAndGet();
+ SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
return (false);
}
currentVersion = stat.getVersion();
- tot_wkr_task_heartbeat.incrementAndGet();
+ SplitLogCounters.tot_wkr_task_heartbeat.incrementAndGet();
return (true);
} catch (KeeperException e) {
if (!isFirstTime) {
@@ -363,7 +376,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
currentTask + " " + StringUtils.stringifyException(e1));
Thread.currentThread().interrupt();
}
- tot_wkr_task_heartbeat_failed.incrementAndGet();
+ SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
return (false);
}
@@ -373,29 +386,28 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
* @param ts
* @param ctr
*/
- private void endTask(ZKSplitLog.TaskState ts, AtomicLong ctr) {
+ private void endTask(SplitLogTask slt, AtomicLong ctr) {
String path = currentTask;
currentTask = null;
try {
- if (ZKUtil.setData(this.watcher, path, ts.get(serverName),
+ if (ZKUtil.setData(this.watcher, path, slt.toByteArray(),
currentVersion)) {
- LOG.info("successfully transitioned task " + path +
- " to final state " + ts);
+ LOG.info("successfully transitioned task " + path + " to final state " + slt);
ctr.incrementAndGet();
return;
}
- LOG.warn("failed to transistion task " + path + " to end state " + ts +
+ LOG.warn("failed to transistion task " + path + " to end state " + slt +
" because of version mismatch ");
} catch (KeeperException.BadVersionException bve) {
- LOG.warn("transisition task " + path + " to " + ts +
+ LOG.warn("transisition task " + path + " to " + slt +
" failed because of version mismatch", bve);
} catch (KeeperException.NoNodeException e) {
- LOG.fatal("logic error - end task " + path + " " + ts +
+ LOG.fatal("logic error - end task " + path + " " + slt +
" failed because task doesn't exist", e);
} catch (KeeperException e) {
- LOG.warn("failed to end task, " + path + " " + ts, e);
+ LOG.warn("failed to end task, " + path + " " + slt, e);
}
- tot_wkr_final_transistion_failed.incrementAndGet();
+ SplitLogCounters.tot_wkr_final_transistion_failed.incrementAndGet();
return;
}
@@ -403,10 +415,17 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
this.watcher.getRecoverableZooKeeper().getZooKeeper().
getData(currentTask, this.watcher,
new GetDataAsyncCallback(), null);
- tot_wkr_get_data_queued.incrementAndGet();
+ SplitLogCounters.tot_wkr_get_data_queued.incrementAndGet();
}
void getDataSetWatchSuccess(String path, byte[] data) {
+ SplitLogTask slt;
+ try {
+ slt = SplitLogTask.parseFrom(data);
+ } catch (DeserializationException e) {
+ LOG.warn("Failed parse", e);
+ return;
+ }
synchronized (grabTaskLock) {
if (workerInGrabTask) {
// currentTask can change but that's ok
@@ -418,13 +437,12 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
// UNASSIGNED because by the time this worker sets the data watch
// the node might have made two transitions - from owned by this
// worker to unassigned to owned by another worker
- if (! TaskState.TASK_OWNED.equals(data, serverName) &&
- ! TaskState.TASK_DONE.equals(data, serverName) &&
- ! TaskState.TASK_ERR.equals(data, serverName) &&
- ! TaskState.TASK_RESIGNED.equals(data, serverName)) {
+ if (! slt.isOwned(this.serverName) &&
+ ! slt.isDone(this.serverName) &&
+ ! slt.isErr(this.serverName) &&
+ ! slt.isResigned(this.serverName)) {
LOG.info("task " + taskpath + " preempted from " +
- serverName + ", current task state and owner=" +
- new String(data));
+ serverName + ", current task state and owner=" + slt.toString());
stopTask();
}
}
@@ -439,7 +457,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
String taskpath = currentTask;
if (taskpath != null && taskpath.equals(path)) {
LOG.info("retrying data watch on " + path);
- tot_wkr_get_data_retry.incrementAndGet();
+ SplitLogCounters.tot_wkr_get_data_retry.incrementAndGet();
getDataSetWatchAsync();
} else {
// no point setting a watch on the task which this worker is not
@@ -543,9 +561,8 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);
@Override
- public void processResult(int rc, String path, Object ctx, byte[] data,
- Stat stat) {
- tot_wkr_get_data_result.incrementAndGet();
+ public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
+ SplitLogCounters.tot_wkr_get_data_result.incrementAndGet();
if (rc != 0) {
LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path);
getDataSetWatchFailure(path);
@@ -573,4 +590,4 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
}
public Status exec(String name, CancelableProgressable p);
}
-}
+}
\ No newline at end of file
diff --git src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
index ea12da4..231e2d6 100644
--- src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
+++ src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
@@ -39,11 +39,11 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.RegionTransition;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
-import org.apache.hadoop.hbase.executor.RegionTransitionData;
import org.apache.hadoop.hbase.io.Reference.Range;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CancelableProgressable;
@@ -858,12 +858,10 @@ public class SplitTransaction {
throws KeeperException, IOException {
LOG.debug(zkw.prefix("Creating ephemeral node for " +
region.getEncodedName() + " in SPLITTING state"));
- RegionTransitionData data =
- new RegionTransitionData(EventType.RS_ZK_REGION_SPLITTING,
+ RegionTransition rt = RegionTransition.createRegionTransition(EventType.RS_ZK_REGION_SPLITTING,
region.getRegionName(), serverName);
-
String node = ZKAssign.getNodeName(zkw, region.getEncodedName());
- if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, data.getBytes())) {
+ if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, rt.toByteArray())) {
throw new IOException("Failed create of ephemeral " + node);
}
// Transition node from SPLITTING to SPLITTING and pick up version so we
diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
index 587386c..bafa069 100644
--- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
+++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -375,7 +376,7 @@ public class HLogSplitter {
// How often to send a progress report (default 1/2 the zookeeper session
// timeout of if that not set, the split log DEFAULT_TIMEOUT)
int period = conf.getInt("hbase.splitlog.report.period",
- conf.getInt("hbase.splitlog.manager.timeout", ZKSplitLog.DEFAULT_TIMEOUT) / 2);
+ conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 2);
int numOpenedFilesBeforeReporting =
conf.getInt("hbase.splitlog.report.openedfiles", 3);
Path logPath = logfile.getPath();
diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/EmptyWatcher.java src/main/java/org/apache/hadoop/hbase/zookeeper/EmptyWatcher.java
new file mode 100644
index 0000000..acdbdd0
--- /dev/null
+++ src/main/java/org/apache/hadoop/hbase/zookeeper/EmptyWatcher.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.WatchedEvent;
+
+/**
+ * An empty ZooKeeper watcher
+ */
+@InterfaceAudience.Private
+public class EmptyWatcher implements Watcher {
+ // Used in this package but also by tests so needs to be public
+ public static EmptyWatcher instance = new EmptyWatcher();
+ private EmptyWatcher() {}
+
+ public void process(WatchedEvent event) {}
+}
\ No newline at end of file
diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/MasterAddressTracker.java src/main/java/org/apache/hadoop/hbase/zookeeper/MasterAddressTracker.java
index f9575af..9b85a7e 100644
--- src/main/java/org/apache/hadoop/hbase/zookeeper/MasterAddressTracker.java
+++ src/main/java/org/apache/hadoop/hbase/zookeeper/MasterAddressTracker.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.DeserializationException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
@@ -80,7 +81,12 @@ public class MasterAddressTracker extends ZooKeeperNodeTracker {
* @return Server name or null if timed out.
*/
public ServerName getMasterAddress(final boolean refresh) {
- return ZKUtil.znodeContentToServerName(super.getData(refresh));
+ try {
+ return ServerName.parseFrom(super.getData(refresh));
+ } catch (DeserializationException e) {
+ LOG.warn("Failed parse", e);
+ return null;
+ }
}
/**
@@ -99,7 +105,13 @@ public class MasterAddressTracker extends ZooKeeperNodeTracker {
if (data == null){
throw new IOException("Can't get master address from ZooKeeper; znode data == null");
}
- return ZKUtil.znodeContentToServerName(data);
+ try {
+ return ServerName.parseFrom(data);
+ } catch (DeserializationException e) {
+ KeeperException ke = new KeeperException.DataInconsistencyException();
+ ke.initCause(e);
+ throw ke;
+ }
}
/**
@@ -116,7 +128,7 @@ public class MasterAddressTracker extends ZooKeeperNodeTracker {
public static boolean setMasterAddress(final ZooKeeperWatcher zkw,
final String znode, final ServerName master)
throws KeeperException {
- return ZKUtil.createEphemeralNodeAndWatch(zkw, znode, getZNodeData(master));
+ return ZKUtil.createEphemeralNodeAndWatch(zkw, znode, toByteArray(master));
}
/**
@@ -132,7 +144,7 @@ public class MasterAddressTracker extends ZooKeeperNodeTracker {
* @return Content of the master znode as a serialized pb with the pb
* magic as prefix.
*/
- static byte [] getZNodeData(final ServerName sn) {
+ static byte [] toByteArray(final ServerName sn) {
ZooKeeperProtos.Master.Builder mbuilder = ZooKeeperProtos.Master.newBuilder();
HBaseProtos.ServerName.Builder snbuilder = HBaseProtos.ServerName.newBuilder();
snbuilder.setHostName(sn.getHostname());
diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java
index babde80..0808912 100644
--- src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java
+++ src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java
@@ -19,17 +19,13 @@ package org.apache.hadoop.hbase.zookeeper;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.DeserializationException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
-import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RootRegionServer;
-import org.apache.hadoop.hbase.util.Addressing;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.zookeeper.KeeperException;
-import com.google.protobuf.InvalidProtocolBufferException;
-
/**
* Tracks the root region server location node in zookeeper.
* Root region location is set by {@link RootLocationEditor} usually called
@@ -64,7 +60,12 @@ public class RootRegionTracker extends ZooKeeperNodeTracker {
* @throws InterruptedException
*/
public ServerName getRootRegionLocation() throws InterruptedException {
- return ZKUtil.znodeContentToServerName(super.getData(true));
+ try {
+ return ServerName.parseFrom(super.getData(true));
+ } catch (DeserializationException e) {
+ LOG.warn("Failed parse", e);
+ return null;
+ }
}
/**
@@ -76,7 +77,11 @@ public class RootRegionTracker extends ZooKeeperNodeTracker {
*/
public static ServerName getRootRegionLocation(final ZooKeeperWatcher zkw)
throws KeeperException {
- return ZKUtil.znodeContentToServerName(ZKUtil.getData(zkw, zkw.rootServerZNode));
+ try {
+ return ServerName.parseFrom(ZKUtil.getData(zkw, zkw.rootServerZNode));
+ } catch (DeserializationException e) {
+ throw ZKUtil.convert(e);
+ }
}
/**
@@ -97,7 +102,12 @@ public class RootRegionTracker extends ZooKeeperNodeTracker {
LOG.error(errorMsg);
throw new IllegalArgumentException(errorMsg);
}
- return ZKUtil.znodeContentToServerName(super.blockUntilAvailable(timeout, true));
+ try {
+ return ServerName.parseFrom(super.blockUntilAvailable(timeout, true));
+ } catch (DeserializationException e) {
+ LOG.warn("Failed parse", e);
+ return null;
+ }
}
/**
@@ -113,7 +123,7 @@ public class RootRegionTracker extends ZooKeeperNodeTracker {
LOG.info("Setting ROOT region location in ZooKeeper as " + location);
// Make the RootRegionServer pb and then get its bytes and save this as
// the znode content.
- byte [] data = getZNodeData(location);
+ byte [] data = toByteArray(location);
try {
ZKUtil.createAndWatch(zookeeper, zookeeper.rootServerZNode, data);
} catch(KeeperException.NodeExistsException nee) {
@@ -127,7 +137,7 @@ public class RootRegionTracker extends ZooKeeperNodeTracker {
* @param sn What to put into the znode.
* @return The content of the root-region-server znode
*/
- static byte [] getZNodeData(final ServerName sn) {
+ static byte [] toByteArray(final ServerName sn) {
// ZNode content is a pb message preceeded by some pb magic.
HBaseProtos.ServerName pbsn =
HBaseProtos.ServerName.newBuilder().setHostName(sn.getHostname()).
@@ -164,6 +174,12 @@ public class RootRegionTracker extends ZooKeeperNodeTracker {
final long timeout)
throws InterruptedException {
byte [] data = ZKUtil.blockUntilAvailable(zkw, zkw.rootServerZNode, timeout);
- return ZKUtil.znodeContentToServerName(data);
+ if (data == null) return null;
+ try {
+ return ServerName.parseFrom(data);
+ } catch (DeserializationException e) {
+ LOG.warn("Failed parse", e);
+ return null;
+ }
}
}
diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java
index e94b672..0796294 100644
--- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java
+++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java
@@ -25,9 +25,13 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.DeserializationException;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.RegionTransition;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.executor.RegionTransitionData;
+// We should not be importing this Type here, nor a RegionTransition, etc. This class should be
+// about zk and bytes only.
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
@@ -145,10 +149,10 @@ public class ZKAssign {
throws KeeperException, KeeperException.NodeExistsException {
LOG.debug(zkw.prefix("Creating unassigned node for " +
region.getEncodedName() + " in OFFLINE state"));
- RegionTransitionData data = new RegionTransitionData(event,
- region.getRegionName(), serverName);
+ RegionTransition rt =
+ RegionTransition.createRegionTransition(event, region.getRegionName(), serverName);
String node = getNodeName(zkw, region.getEncodedName());
- ZKUtil.createAndWatch(zkw, node, data.getBytes());
+ ZKUtil.createAndWatch(zkw, node, rt.toByteArray());
}
/**
@@ -172,10 +176,10 @@ public class ZKAssign {
throws KeeperException {
LOG.debug(zkw.prefix("Async create of unassigned node for " +
region.getEncodedName() + " with OFFLINE state"));
- RegionTransitionData data = new RegionTransitionData(
- EventType.M_ZK_REGION_OFFLINE, region.getRegionName(), serverName);
+ RegionTransition rt =
+ RegionTransition.createRegionTransition(EventType.M_ZK_REGION_OFFLINE, region.getRegionName(), serverName);
String node = getNodeName(zkw, region.getEncodedName());
- ZKUtil.asyncCreate(zkw, node, data.getBytes(), cb, ctx);
+ ZKUtil.asyncCreate(zkw, node, rt.toByteArray(), cb, ctx);
}
/**
@@ -201,10 +205,10 @@ public class ZKAssign {
throws KeeperException, KeeperException.NoNodeException {
LOG.debug(zkw.prefix("Forcing existing unassigned node for " +
region.getEncodedName() + " to OFFLINE state"));
- RegionTransitionData data = new RegionTransitionData(
- EventType.M_ZK_REGION_OFFLINE, region.getRegionName(), serverName);
+ RegionTransition rt =
+ RegionTransition.createRegionTransition(EventType.M_ZK_REGION_OFFLINE, region.getRegionName(), serverName);
String node = getNodeName(zkw, region.getEncodedName());
- ZKUtil.setData(zkw, node, data.getBytes());
+ ZKUtil.setData(zkw, node, rt.toByteArray());
}
/**
@@ -267,8 +271,9 @@ public class ZKAssign {
throws KeeperException {
LOG.debug(zkw.prefix("Creating (or updating) unassigned node for " +
region.getEncodedName() + " with OFFLINE state"));
- RegionTransitionData data = new RegionTransitionData(
- EventType.M_ZK_REGION_OFFLINE, region.getRegionName(), serverName);
+ RegionTransition rt = RegionTransition.createRegionTransition(EventType.M_ZK_REGION_OFFLINE,
+ region.getRegionName(), serverName, HConstants.EMPTY_BYTE_ARRAY);
+ byte [] data = rt.toByteArray();
String node = getNodeName(zkw, region.getEncodedName());
Stat stat = new Stat();
zkw.sync(node);
@@ -282,15 +287,20 @@ public class ZKAssign {
if (hijack && !allowCreation) {
return -1;
}
- return ZKUtil.createAndWatch(zkw, node, data.getBytes());
+ return ZKUtil.createAndWatch(zkw, node, data);
} else {
- RegionTransitionData curDataInZNode = ZKAssign.getDataNoWatch(zkw, region
- .getEncodedName(), stat);
+ byte [] curDataInZNode = ZKAssign.getDataNoWatch(zkw, region.getEncodedName(), stat);
+ RegionTransition curRt;
+ try {
+ curRt = curDataInZNode == null? null: RegionTransition.parseFrom(curDataInZNode);
+ } catch (DeserializationException e) {
+ throw ZKUtil.convert(e);
+ }
// Do not move the node to OFFLINE if znode is in any of the following
// state.
// Because these are already executed states.
- if (hijack && null != curDataInZNode) {
- EventType eventType = curDataInZNode.getEventType();
+ if (hijack && curRt != null) {
+ EventType eventType = curRt.getEventType();
if (eventType.equals(EventType.M_ZK_REGION_CLOSING)
|| eventType.equals(EventType.RS_ZK_REGION_CLOSED)
|| eventType.equals(EventType.RS_ZK_REGION_OPENED)) {
@@ -300,7 +310,7 @@ public class ZKAssign {
boolean setData = false;
try {
- setData = ZKUtil.setData(zkw, node, data.getBytes(), version);
+ setData = ZKUtil.setData(zkw, node, data, version);
// Setdata throws KeeperException which aborts the Master. So we are
// catching it here.
// If just before setting the znode to OFFLINE if the RS has made any
@@ -315,9 +325,13 @@ public class ZKAssign {
} else {
// We successfully forced to OFFLINE, reset watch and handle if
// the state changed in between our set and the watch
- RegionTransitionData curData =
- ZKAssign.getData(zkw, region.getEncodedName());
- if (curData.getEventType() != data.getEventType()) {
+ byte [] bytes = ZKAssign.getData(zkw, region.getEncodedName());
+ try {
+ rt = RegionTransition.parseFrom(bytes);
+ } catch (DeserializationException e) {
+ throw ZKUtil.convert(e);
+ }
+ if (rt.getEventType() != EventType.M_ZK_REGION_OFFLINE) {
// state changed, need to process
return -1;
}
@@ -494,11 +508,17 @@ public class ZKAssign {
// If it came back null, node does not exist.
throw KeeperException.create(Code.NONODE);
}
- RegionTransitionData data = RegionTransitionData.fromBytes(bytes);
- if (!data.getEventType().equals(expectedState)) {
- LOG.warn(zkw.prefix("Attempting to delete unassigned " +
- "node " + regionName + " in " + expectedState +
- " state but node is in " + data.getEventType() + " state"));
+ RegionTransition rt;
+ try {
+ rt = RegionTransition.parseFrom(bytes);
+ } catch (DeserializationException e) {
+ // Convert exception. Otherwise have to alter public api
+ throw ZKUtil.convert(e);
+ }
+ EventType et = rt.getEventType();
+ if (!et.equals(expectedState)) {
+ LOG.warn(zkw.prefix("Attempting to delete unassigned node " + regionName + " in " +
+ expectedState + " state but node is in " + et + " state"));
return false;
}
if (expectedVersion != -1
@@ -564,12 +584,10 @@ public class ZKAssign {
throws KeeperException, KeeperException.NodeExistsException {
LOG.debug(zkw.prefix("Creating unassigned node for " +
region.getEncodedName() + " in a CLOSING state"));
-
- RegionTransitionData data = new RegionTransitionData(
- EventType.M_ZK_REGION_CLOSING, region.getRegionName(), serverName);
-
+ RegionTransition rt = RegionTransition.createRegionTransition(EventType.M_ZK_REGION_CLOSING,
+ region.getRegionName(), serverName, HConstants.EMPTY_BYTE_ARRAY);
String node = getNodeName(zkw, region.getEncodedName());
- return ZKUtil.createAndWatch(zkw, node, data.getBytes());
+ return ZKUtil.createAndWatch(zkw, node, rt.toByteArray());
}
/**
@@ -748,8 +766,7 @@ public class ZKAssign {
ServerName serverName, EventType beginState, EventType endState,
int expectedVersion)
throws KeeperException {
- return transitionNode(zkw, region, serverName, beginState, endState,
- expectedVersion, null);
+ return transitionNode(zkw, region, serverName, beginState, endState, expectedVersion, null);
}
public static int transitionNode(ZooKeeperWatcher zkw, HRegionInfo region,
@@ -773,11 +790,16 @@ public class ZKAssign {
// Node no longer exists. Return -1. It means unsuccessful transition.
return -1;
}
- RegionTransitionData existingData =
- RegionTransitionData.fromBytes(existingBytes);
+ RegionTransition rt;
+ try {
+ rt = RegionTransition.parseFrom(existingBytes);
+ } catch (DeserializationException e) {
+ // Convert to a zk exception for now. Otherwise have to change API
+ throw ZKUtil.convert(e);
+ }
// Verify it is the expected version
- if(expectedVersion != -1 && stat.getVersion() != expectedVersion) {
+ if (expectedVersion != -1 && stat.getVersion() != expectedVersion) {
LOG.warn(zkw.prefix("Attempt to transition the " +
"unassigned node for " + encoded +
" from " + beginState + " to " + endState + " failed, " +
@@ -799,20 +821,19 @@ public class ZKAssign {
}
// Verify it is in expected state
- if(!existingData.getEventType().equals(beginState)) {
+ EventType et = rt.getEventType();
+ if (!et.equals(beginState)) {
LOG.warn(zkw.prefix("Attempt to transition the " +
"unassigned node for " + encoded +
" from " + beginState + " to " + endState + " failed, " +
- "the node existed but was in the state " + existingData.getEventType() +
- " set by the server " + serverName));
+ "the node existed but was in the state " + et + " set by the server " + serverName));
return -1;
}
// Write new data, ensuring data has not changed since we last read it
try {
- RegionTransitionData data = new RegionTransitionData(endState,
- region.getRegionName(), serverName, payload);
- if(!ZKUtil.setData(zkw, node, data.getBytes(), stat.getVersion())) {
+ rt = RegionTransition.createRegionTransition(endState, region.getRegionName(), serverName, payload);
+ if(!ZKUtil.setData(zkw, node, rt.toByteArray(), stat.getVersion())) {
LOG.warn(zkw.prefix("Attempt to transition the " +
"unassigned node for " + encoded +
" from " + beginState + " to " + endState + " failed, " +
@@ -845,19 +866,14 @@ public class ZKAssign {
*
* @param zkw zk reference
* @param pathOrRegionName fully-specified path or region name
- * @return data for the unassigned node
+ * @return znode content
* @throws KeeperException if unexpected zookeeper exception
*/
- public static RegionTransitionData getData(ZooKeeperWatcher zkw,
+ public static byte [] getData(ZooKeeperWatcher zkw,
String pathOrRegionName)
throws KeeperException {
- String node = pathOrRegionName.startsWith("/") ?
- pathOrRegionName : getNodeName(zkw, pathOrRegionName);
- byte [] data = ZKUtil.getDataAndWatch(zkw, node);
- if(data == null) {
- return null;
- }
- return RegionTransitionData.fromBytes(data);
+ String node = getPath(zkw, pathOrRegionName);
+ return ZKUtil.getDataAndWatch(zkw, node);
}
/**
@@ -871,19 +887,14 @@ public class ZKAssign {
* @param zkw zk reference
* @param pathOrRegionName fully-specified path or region name
* @param stat object to populate the version.
- * @return data for the unassigned node
+ * @return znode content
* @throws KeeperException if unexpected zookeeper exception
*/
- public static RegionTransitionData getDataAndWatch(ZooKeeperWatcher zkw,
+ public static byte [] getDataAndWatch(ZooKeeperWatcher zkw,
String pathOrRegionName, Stat stat)
throws KeeperException {
- String node = pathOrRegionName.startsWith("/") ?
- pathOrRegionName : getNodeName(zkw, pathOrRegionName);
- byte [] data = ZKUtil.getDataAndWatch(zkw, node, stat);
- if(data == null) {
- return null;
- }
- return RegionTransitionData.fromBytes(data);
+ String node = getPath(zkw, pathOrRegionName);
+ return ZKUtil.getDataAndWatch(zkw, node, stat);
}
/**
@@ -897,19 +908,23 @@ public class ZKAssign {
* @param zkw zk reference
* @param pathOrRegionName fully-specified path or region name
* @param stat object to store node info into on getData call
- * @return data for the unassigned node or null if node does not exist
+ * @return znode content
* @throws KeeperException if unexpected zookeeper exception
*/
- public static RegionTransitionData getDataNoWatch(ZooKeeperWatcher zkw,
+ public static byte [] getDataNoWatch(ZooKeeperWatcher zkw,
String pathOrRegionName, Stat stat)
throws KeeperException {
- String node = pathOrRegionName.startsWith("/") ?
- pathOrRegionName : getNodeName(zkw, pathOrRegionName);
- byte [] data = ZKUtil.getDataNoWatch(zkw, node, stat);
- if (data == null) {
- return null;
- }
- return RegionTransitionData.fromBytes(data);
+ String node = getPath(zkw, pathOrRegionName);
+ return ZKUtil.getDataNoWatch(zkw, node, stat);
+ }
+
+ /**
+ * @param zkw
+ * @param pathOrRegionName
+ * @return Path to znode
+ */
+ private static String getPath(final ZooKeeperWatcher zkw, final String pathOrRegionName) {
+ return pathOrRegionName.startsWith("/")? pathOrRegionName : getNodeName(zkw, pathOrRegionName);
}
/**
@@ -983,42 +998,18 @@ public class ZKAssign {
}
/**
- * Verifies that the specified region is in the specified state in ZooKeeper.
- *
- * Returns true if region is in transition and in the specified state in
- * ZooKeeper. Returns false if the region does not exist in ZK or is in
- * a different state.
- *
- * Method synchronizes() with ZK so will yield an up-to-date result but is
- * a slow read.
- * @param zkw
- * @param region
- * @param expectedState
- * @return true if region exists and is in expected state
+ * Presume bytes are serialized unassigned data structure
+ * @param znodeBytes
+ * @return String of the deserialized znode bytes.
*/
- public static boolean verifyRegionState(ZooKeeperWatcher zkw,
- HRegionInfo region, EventType expectedState)
- throws KeeperException {
- String encoded = region.getEncodedName();
-
- String node = getNodeName(zkw, encoded);
- zkw.sync(node);
-
- // Read existing data of the node
- byte [] existingBytes = null;
+ static String toString(final byte[] znodeBytes) {
+ // This method should not exist. Used by ZKUtil stringifying RegionTransition. Have the
+ // method in here so RegionTransition does not leak into ZKUtil.
try {
- existingBytes = ZKUtil.getDataAndWatch(zkw, node);
- } catch (KeeperException.NoNodeException nne) {
- return false;
- } catch (KeeperException e) {
- throw e;
- }
- if (existingBytes == null) return false;
- RegionTransitionData existingData =
- RegionTransitionData.fromBytes(existingBytes);
- if (existingData.getEventType() == expectedState){
- return true;
+ RegionTransition rt = RegionTransition.parseFrom(znodeBytes);
+ return rt.toString();
+ } catch (DeserializationException e) {
+ return "";
}
- return false;
}
-}
+}
\ No newline at end of file
diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
index 30d7fe9..8ca13a8 100644
--- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
+++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
@@ -1,7 +1,5 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
+ * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
@@ -21,13 +19,10 @@ package org.apache.hadoop.hbase.zookeeper;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
-import java.lang.reflect.Field;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -37,30 +32,23 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
-import org.apache.hadoop.hbase.util.Bytes;
/**
- * Common methods and attributes used by {@link SplitLogManager} and
- * {@link SplitLogWorker}
+ * Common methods and attributes used by {@link SplitLogManager} and {@link SplitLogWorker}
+ * running distributed splitting of WAL logs.
*/
@InterfaceAudience.Private
public class ZKSplitLog {
private static final Log LOG = LogFactory.getLog(ZKSplitLog.class);
- public static final int DEFAULT_TIMEOUT = 25000; // 25 sec
- public static final int DEFAULT_ZK_RETRIES = 3;
- public static final int DEFAULT_MAX_RESUBMIT = 3;
- public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); //3 min
-
/**
* Gets the full path node name for the log file being split.
* This method will url encode the filename.
* @param zkw zk reference
* @param filename log file name (only the basename)
*/
- public static String getEncodedNodeName(ZooKeeperWatcher zkw,
- String filename) {
- return ZKUtil.joinZNode(zkw.splitLogZNode, encode(filename));
+ public static String getEncodedNodeName(ZooKeeperWatcher zkw, String filename) {
+ return ZKUtil.joinZNode(zkw.splitLogZNode, encode(filename));
}
public static String getFileName(String node) {
@@ -68,8 +56,7 @@ public class ZKSplitLog {
return decode(basename);
}
-
- public static String encode(String s) {
+ static String encode(String s) {
try {
return URLEncoder.encode(s, "UTF-8");
} catch (UnsupportedEncodingException e) {
@@ -77,7 +64,7 @@ public class ZKSplitLog {
}
}
- public static String decode(String s) {
+ static String decode(String s) {
try {
return URLDecoder.decode(s, "UTF-8");
} catch (UnsupportedEncodingException e) {
@@ -107,53 +94,6 @@ public class ZKSplitLog {
return dirname.equals(zkw.splitLogZNode);
}
- public static enum TaskState {
- TASK_UNASSIGNED("unassigned"),
- TASK_OWNED("owned"),
- TASK_RESIGNED("resigned"),
- TASK_DONE("done"),
- TASK_ERR("err");
-
- private final byte[] state;
- private TaskState(String s) {
- state = s.getBytes();
- }
-
- public byte[] get(String serverName) {
- return (Bytes.add(state, " ".getBytes(), serverName.getBytes()));
- }
-
- public String getWriterName(byte[] data) {
- String str = Bytes.toString(data);
- return str.substring(str.indexOf(' ') + 1);
- }
-
-
- /**
- * @param s
- * @return True if {@link #state} is a prefix of s. False otherwise.
- */
- public boolean equals(byte[] s) {
- if (s.length < state.length) {
- return (false);
- }
- for (int i = 0; i < state.length; i++) {
- if (state[i] != s[i]) {
- return (false);
- }
- }
- return (true);
- }
-
- public boolean equals(byte[] s, String serverName) {
- return (Arrays.equals(s, get(serverName)));
- }
- @Override
- public String toString() {
- return new String(state);
- }
- }
-
public static Path getSplitLogDir(Path rootdir, String tmpname) {
return new Path(new Path(rootdir, HConstants.SPLIT_LOGDIR_NAME), tmpname);
}
@@ -172,8 +112,8 @@ public class ZKSplitLog {
return ret;
}
- public static String getSplitLogDirTmpComponent(String worker, String file) {
- return (worker + "_" + ZKSplitLog.encode(file));
+ public static String getSplitLogDirTmpComponent(final String worker, String file) {
+ return worker + "_" + ZKSplitLog.encode(file);
}
public static void markCorrupted(Path rootdir, String tmpname,
@@ -198,81 +138,4 @@ public class ZKSplitLog {
public static boolean isCorruptFlagFile(Path file) {
return file.getName().equals("corrupt");
}
-
-
- public static class Counters {
- //SplitLogManager counters
- public static AtomicLong tot_mgr_log_split_batch_start = new AtomicLong(0);
- public static AtomicLong tot_mgr_log_split_batch_success =
- new AtomicLong(0);
- public static AtomicLong tot_mgr_log_split_batch_err = new AtomicLong(0);
- public static AtomicLong tot_mgr_new_unexpected_hlogs = new AtomicLong(0);
- public static AtomicLong tot_mgr_log_split_start = new AtomicLong(0);
- public static AtomicLong tot_mgr_log_split_success = new AtomicLong(0);
- public static AtomicLong tot_mgr_log_split_err = new AtomicLong(0);
- public static AtomicLong tot_mgr_node_create_queued = new AtomicLong(0);
- public static AtomicLong tot_mgr_node_create_result = new AtomicLong(0);
- public static AtomicLong tot_mgr_node_already_exists = new AtomicLong(0);
- public static AtomicLong tot_mgr_node_create_err = new AtomicLong(0);
- public static AtomicLong tot_mgr_node_create_retry = new AtomicLong(0);
- public static AtomicLong tot_mgr_get_data_queued = new AtomicLong(0);
- public static AtomicLong tot_mgr_get_data_result = new AtomicLong(0);
- public static AtomicLong tot_mgr_get_data_nonode = new AtomicLong(0);
- public static AtomicLong tot_mgr_get_data_err = new AtomicLong(0);
- public static AtomicLong tot_mgr_get_data_retry = new AtomicLong(0);
- public static AtomicLong tot_mgr_node_delete_queued = new AtomicLong(0);
- public static AtomicLong tot_mgr_node_delete_result = new AtomicLong(0);
- public static AtomicLong tot_mgr_node_delete_err = new AtomicLong(0);
- public static AtomicLong tot_mgr_resubmit = new AtomicLong(0);
- public static AtomicLong tot_mgr_resubmit_failed = new AtomicLong(0);
- public static AtomicLong tot_mgr_null_data = new AtomicLong(0);
- public static AtomicLong tot_mgr_orphan_task_acquired = new AtomicLong(0);
- public static AtomicLong tot_mgr_wait_for_zk_delete = new AtomicLong(0);
- public static AtomicLong tot_mgr_unacquired_orphan_done = new AtomicLong(0);
- public static AtomicLong tot_mgr_resubmit_threshold_reached =
- new AtomicLong(0);
- public static AtomicLong tot_mgr_missing_state_in_delete =
- new AtomicLong(0);
- public static AtomicLong tot_mgr_heartbeat = new AtomicLong(0);
- public static AtomicLong tot_mgr_rescan = new AtomicLong(0);
- public static AtomicLong tot_mgr_rescan_deleted = new AtomicLong(0);
- public static AtomicLong tot_mgr_task_deleted = new AtomicLong(0);
- public static AtomicLong tot_mgr_resubmit_unassigned = new AtomicLong(0);
- public static AtomicLong tot_mgr_relist_logdir = new AtomicLong(0);
- public static AtomicLong tot_mgr_resubmit_dead_server_task =
- new AtomicLong(0);
-
-
-
- // SplitLogWorker counters
- public static AtomicLong tot_wkr_failed_to_grab_task_no_data =
- new AtomicLong(0);
- public static AtomicLong tot_wkr_failed_to_grab_task_exception =
- new AtomicLong(0);
- public static AtomicLong tot_wkr_failed_to_grab_task_owned =
- new AtomicLong(0);
- public static AtomicLong tot_wkr_failed_to_grab_task_lost_race =
- new AtomicLong(0);
- public static AtomicLong tot_wkr_task_acquired = new AtomicLong(0);
- public static AtomicLong tot_wkr_task_resigned = new AtomicLong(0);
- public static AtomicLong tot_wkr_task_done = new AtomicLong(0);
- public static AtomicLong tot_wkr_task_err = new AtomicLong(0);
- public static AtomicLong tot_wkr_task_heartbeat = new AtomicLong(0);
- public static AtomicLong tot_wkr_task_acquired_rescan = new AtomicLong(0);
- public static AtomicLong tot_wkr_get_data_queued = new AtomicLong(0);
- public static AtomicLong tot_wkr_get_data_result = new AtomicLong(0);
- public static AtomicLong tot_wkr_get_data_retry = new AtomicLong(0);
- public static AtomicLong tot_wkr_preempt_task = new AtomicLong(0);
- public static AtomicLong tot_wkr_task_heartbeat_failed = new AtomicLong(0);
- public static AtomicLong tot_wkr_final_transistion_failed =
- new AtomicLong(0);
-
- public static void resetCounters() throws Exception {
- Class> cl = (new Counters()).getClass();
- Field[] flds = cl.getDeclaredFields();
- for (Field fld : flds) {
- ((AtomicLong)fld.get(null)).set(0);
- }
- }
- }
-}
+}
\ No newline at end of file
diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
index 46a6fde..f725348 100644
--- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
+++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
@@ -26,7 +26,6 @@ import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Properties;
@@ -36,28 +35,20 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.EmptyWatcher;
+import org.apache.hadoop.hbase.DeserializationException;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.executor.RegionTransitionData;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
-import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RootRegionServer;
-import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
-import com.google.protobuf.InvalidProtocolBufferException;
-
/**
* Internal HBase utility class for ZooKeeper.
*
@@ -359,8 +350,7 @@ public class ZKUtil {
* null if parent does not exist
* @throws KeeperException if unexpected zookeeper exception
*/
- public static List listChildrenNoWatch(
- ZooKeeperWatcher zkw, String znode)
+ public static List listChildrenNoWatch(ZooKeeperWatcher zkw, String znode)
throws KeeperException {
List children = null;
try {
@@ -376,7 +366,9 @@ public class ZKUtil {
/**
* Simple class to hold a node path and node data.
+ * @deprecated Unused
*/
+ @Deprecated
public static class NodeAndData {
private String node;
private byte [] data;
@@ -392,7 +384,7 @@ public class ZKUtil {
}
@Override
public String toString() {
- return node + " (" + RegionTransitionData.fromBytes(data) + ")";
+ return node;
}
public boolean isEmpty() {
return (data.length == 0);
@@ -600,6 +592,7 @@ public class ZKUtil {
* @return list of data of children of the specified node, an empty list if the node
* exists but has no children, and null if the node does not exist
* @throws KeeperException if unexpected zookeeper exception
+ * @deprecated Unused
*/
public static List getChildDataAndWatchForNewChildren(
ZooKeeperWatcher zkw, String baseNode) throws KeeperException {
@@ -630,6 +623,7 @@ public class ZKUtil {
* @param expectedVersion
* @throws KeeperException if unexpected zookeeper exception
* @throws KeeperException.BadVersionException if version mismatch
+ * @deprecated Unused
*/
public static void updateExistingNodeData(ZooKeeperWatcher zkw, String znode,
byte [] data, int expectedVersion)
@@ -1144,9 +1138,9 @@ public class ZKUtil {
" byte(s) of data from znode " + znode +
(watcherSet? " and set watcher; ": "; data=") +
(data == null? "null": data.length == 0? "empty": (
- znode.startsWith(zkw.assignmentZNode) ?
- RegionTransitionData.fromBytes(data).toString()
- : StringUtils.abbreviate(Bytes.toStringBinary(data), 32)))));
+ znode.startsWith(zkw.assignmentZNode)?
+ ZKAssign.toString(data): // We should not be doing this reaching into another class
+ StringUtils.abbreviate(Bytes.toStringBinary(data), 32)))));
}
/**
@@ -1222,44 +1216,14 @@ public class ZKUtil {
/**
- * Get a ServerName from the passed in znode data bytes.
- * @param data ZNode data with a server name in it; can handle the old style
- * servername where servername was host and port. Works too with data that
- * begins w/ the pb 'PBUF' magic and that its then followed by a protobuf that
- * has a serialized {@link ServerName} in it.
- * @return Returns null if data is null else converts passed data
- * to a ServerName instance.
+ * Convert a {@link DeserializationException} to a more palatable {@link KeeperException}.
+ * Used when can't let a {@link DeserializationException} out w/o changing public API.
+ * @param e Exception to convert
+ * @return Converted exception
*/
- public static ServerName znodeContentToServerName(final byte [] data) {
- if (data == null || data.length <= 0) return null;
- if (ProtobufUtil.isPBMagicPrefix(data)) {
- int prefixLen = ProtobufUtil.lengthOfPBMagic();
- try {
- RootRegionServer rss =
- RootRegionServer.newBuilder().mergeFrom(data, prefixLen, data.length - prefixLen).build();
- HBaseProtos.ServerName sn = rss.getServer();
- return new ServerName(sn.getHostName(), sn.getPort(), sn.getStartCode());
- } catch (InvalidProtocolBufferException e) {
- // A failed parse of the znode is pretty catastrophic. Rather than loop
- // retrying hoping the bad bytes will changes, and rather than change
- // the signature on this method to add an IOE which will send ripples all
- // over the code base, throw a RuntimeException. This should "never" happen.
- // Fail fast if it does.
- throw new RuntimeException(e);
- }
- }
- // The str returned could be old style -- pre hbase-1502 -- which was
- // hostname and port seperated by a colon rather than hostname, port and
- // startcode delimited by a ','.
- String str = Bytes.toString(data);
- int index = str.indexOf(ServerName.SERVERNAME_SEPARATOR);
- if (index != -1) {
- // Presume its ServerName serialized with versioned bytes.
- return ServerName.parseVersionedServerName(data);
- }
- // Presume it a hostname:port format.
- String hostname = Addressing.parseHostname(str);
- int port = Addressing.parsePort(str);
- return new ServerName(hostname, port, -1L);
+ public static KeeperException convert(final DeserializationException e) {
+ KeeperException ke = new KeeperException.DataInconsistencyException();
+ ke.initCause(e);
+ return ke;
}
}
\ No newline at end of file
diff --git src/main/protobuf/ZooKeeper.proto src/main/protobuf/ZooKeeper.proto
index 961ab65..98142ce 100644
--- src/main/protobuf/ZooKeeper.proto
+++ src/main/protobuf/ZooKeeper.proto
@@ -60,3 +60,33 @@ message ClusterUp {
// the data is cluster startDate.
required string startDate = 1;
}
+
+/**
+ * What we write under unassigned up in zookeeper as a region moves through
+ * open/close, etc., regions. Details a region in transition.
+ */
+message RegionTransition {
+ // Code for EventType gotten by doing o.a.h.h.EventHandler.EventType.getCode()
+ required uint32 eventTypeCode = 1;
+ // Full regionname in bytes
+ required bytes regionName = 2;
+ required uint64 createTime = 3;
+ optional ServerName originServerName = 4;
+ optional bytes payload = 5;
+}
+
+/**
+ * WAL SplitLog directory znodes have this for content. Used doing distributed
+ * WAL splitting. Holds current state and name of server that originated split.
+ */
+message SplitLogTask {
+ enum State {
+ UNASSIGNED = 0;
+ OWNED = 1;
+ RESIGNED = 2;
+ DONE = 3;
+ ERR = 4;
+ }
+ required State state = 1;
+ required ServerName serverName = 2;
+}
diff --git src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index c3a1889..3a1dbb8 100644
--- src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -81,6 +81,7 @@ import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.RegionSplitter;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
diff --git src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
index d496d48..1391dc8 100644
--- src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
+++ src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
diff --git src/test/java/org/apache/hadoop/hbase/master/Mocking.java src/test/java/org/apache/hadoop/hbase/master/Mocking.java
index 676d6bb..51f9308 100644
--- src/test/java/org/apache/hadoop/hbase/master/Mocking.java
+++ src/test/java/org/apache/hadoop/hbase/master/Mocking.java
@@ -23,9 +23,11 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.hbase.DeserializationException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.RegionTransition;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
@@ -33,6 +35,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
@@ -71,12 +74,13 @@ public class Mocking {
* @param sn Name of the regionserver doing the 'opening'
* @param hri Region we're 'opening'.
* @throws KeeperException
+ * @throws DeserializationException
*/
static void fakeRegionServerRegionOpenInZK(final ZooKeeperWatcher w,
final ServerName sn, final HRegionInfo hri)
- throws KeeperException {
+ throws KeeperException, DeserializationException {
// Wait till we see the OFFLINE zk node before we proceed.
- while (!ZKAssign.verifyRegionState(w, hri, EventType.M_ZK_REGION_OFFLINE)) {
+ while (!verifyRegionState(w, hri, EventType.M_ZK_REGION_OFFLINE)) {
Threads.sleep(1);
}
// Get current versionid else will fail on transition from OFFLINE to OPENING below
@@ -94,4 +98,40 @@ public class Mocking {
// We should be done now. The master open handler will notice the
// transition and remove this regions znode.
}
+
+ /**
+ * Verifies that the specified region is in the specified state in ZooKeeper.
+ *
+ * Returns true if region is in transition and in the specified state in
+ * ZooKeeper. Returns false if the region does not exist in ZK or is in
+ * a different state.
+ *
+ * Method synchronizes() with ZK so will yield an up-to-date result but is
+ * a slow read.
+ * @param zkw
+ * @param region
+ * @param expectedState
+ * @return true if region exists and is in expected state
+ * @throws DeserializationException
+ */
+ static boolean verifyRegionState(ZooKeeperWatcher zkw, HRegionInfo region, EventType expectedState)
+ throws KeeperException, DeserializationException {
+ String encoded = region.getEncodedName();
+
+ String node = ZKAssign.getNodeName(zkw, encoded);
+ zkw.sync(node);
+
+ // Read existing data of the node
+ byte [] existingBytes = null;
+ try {
+ existingBytes = ZKUtil.getDataAndWatch(zkw, node);
+ } catch (KeeperException.NoNodeException nne) {
+ return false;
+ } catch (KeeperException e) {
+ throw e;
+ }
+ if (existingBytes == null) return false;
+ RegionTransition rt = RegionTransition.parseFrom(existingBytes);
+ return rt.getEventType().equals(expectedState);
+ }
}
diff --git src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
index 36046f8..c3ccf21 100644
--- src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
+++ src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
@@ -29,23 +29,24 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.hbase.DeserializationException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerLoad;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.RegionTransition;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.client.ClientProtocol;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.executor.ExecutorService;
-import org.apache.hadoop.hbase.executor.RegionTransitionData;
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
+import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -61,10 +62,9 @@ import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.hadoop.hbase.MediumTests;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.Watcher;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -159,10 +159,11 @@ public class TestAssignmentManager {
* @throws IOException
* @throws KeeperException
* @throws InterruptedException
+ * @throws DeserializationException
*/
@Test(timeout = 5000)
public void testBalanceOnMasterFailoverScenarioWithOpenedNode()
- throws IOException, KeeperException, InterruptedException, ServiceException {
+ throws IOException, KeeperException, InterruptedException, ServiceException, DeserializationException {
AssignmentManagerWithExtrasForTesting am =
setUpMockedAssignmentManager(this.server, this.serverManager);
try {
@@ -177,7 +178,7 @@ public class TestAssignmentManager {
int versionid =
ZKAssign.transitionNodeClosed(this.watcher, REGIONINFO, SERVERNAME_A, -1);
assertNotSame(versionid, -1);
- while (!ZKAssign.verifyRegionState(this.watcher, REGIONINFO,
+ while (!Mocking.verifyRegionState(this.watcher, REGIONINFO,
EventType.M_ZK_REGION_OFFLINE)) {
Threads.sleep(1);
}
@@ -205,7 +206,7 @@ public class TestAssignmentManager {
@Test(timeout = 5000)
public void testBalanceOnMasterFailoverScenarioWithClosedNode()
- throws IOException, KeeperException, InterruptedException, ServiceException {
+ throws IOException, KeeperException, InterruptedException, ServiceException, DeserializationException {
AssignmentManagerWithExtrasForTesting am =
setUpMockedAssignmentManager(this.server, this.serverManager);
try {
@@ -221,7 +222,7 @@ public class TestAssignmentManager {
ZKAssign.transitionNodeClosed(this.watcher, REGIONINFO, SERVERNAME_A, -1);
assertNotSame(versionid, -1);
am.gate.set(false);
- while (!ZKAssign.verifyRegionState(this.watcher, REGIONINFO,
+ while (!Mocking.verifyRegionState(this.watcher, REGIONINFO,
EventType.M_ZK_REGION_OFFLINE)) {
Threads.sleep(1);
}
@@ -249,7 +250,7 @@ public class TestAssignmentManager {
@Test(timeout = 5000)
public void testBalanceOnMasterFailoverScenarioWithOfflineNode()
- throws IOException, KeeperException, InterruptedException, ServiceException {
+ throws IOException, KeeperException, InterruptedException, ServiceException, DeserializationException {
AssignmentManagerWithExtrasForTesting am =
setUpMockedAssignmentManager(this.server, this.serverManager);
try {
@@ -264,7 +265,7 @@ public class TestAssignmentManager {
int versionid =
ZKAssign.transitionNodeClosed(this.watcher, REGIONINFO, SERVERNAME_A, -1);
assertNotSame(versionid, -1);
- while (!ZKAssign.verifyRegionState(this.watcher, REGIONINFO,
+ while (!Mocking.verifyRegionState(this.watcher, REGIONINFO,
EventType.M_ZK_REGION_OFFLINE)) {
Threads.sleep(1);
}
@@ -306,10 +307,11 @@ public class TestAssignmentManager {
* from one server to another mocking regionserver responding over zk.
* @throws IOException
* @throws KeeperException
+ * @throws DeserializationException
*/
@Test
public void testBalance()
- throws IOException, KeeperException {
+ throws IOException, KeeperException, DeserializationException {
// Create and startup an executor. This is used by AssignmentManager
// handling zk callbacks.
ExecutorService executor = startupMasterExecutor("testBalanceExecutor");
@@ -345,7 +347,7 @@ public class TestAssignmentManager {
// balancer. The zk node will be OFFLINE waiting for regionserver to
// transition it through OPENING, OPENED. Wait till we see the OFFLINE
// zk node before we proceed.
- while (!ZKAssign.verifyRegionState(this.watcher, REGIONINFO, EventType.M_ZK_REGION_OFFLINE)) {
+ while (!Mocking.verifyRegionState(this.watcher, REGIONINFO, EventType.M_ZK_REGION_OFFLINE)) {
Threads.sleep(1);
}
// Get current versionid else will fail on transition from OFFLINE to OPENING below
@@ -541,12 +543,12 @@ public class TestAssignmentManager {
private static int createNodeSplitting(final ZooKeeperWatcher zkw,
final HRegionInfo region, final ServerName serverName)
throws KeeperException, IOException {
- RegionTransitionData data =
- new RegionTransitionData(EventType.RS_ZK_REGION_SPLITTING,
+ RegionTransition rt =
+ RegionTransition.createRegionTransition(EventType.RS_ZK_REGION_SPLITTING,
region.getRegionName(), serverName);
String node = ZKAssign.getNodeName(zkw, region.getEncodedName());
- if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, data.getBytes())) {
+ if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, rt.toByteArray())) {
throw new IOException("Failed create of ephemeral " + node);
}
// Transition node from SPLITTING to SPLITTING and pick up version so we
@@ -650,12 +652,12 @@ public class TestAssignmentManager {
deadServers);
}
@Override
- void processRegionsInTransition(final RegionTransitionData data,
+ void processRegionsInTransition(final RegionTransition rt,
final HRegionInfo regionInfo,
final Map>> deadServers,
final int expectedVersion) throws KeeperException {
while (this.gate.get()) Threads.sleep(1);
- super.processRegionsInTransition(data, regionInfo, deadServers, expectedVersion);
+ super.processRegionsInTransition(rt, regionInfo, deadServers, expectedVersion);
}
/** reset the watcher */
diff --git src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
index 2669876..f97cd2c 100644
--- src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
+++ src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
@@ -19,7 +19,7 @@
*/
package org.apache.hadoop.hbase.master;
-import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*;
+import static org.apache.hadoop.hbase.SplitLogCounters.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -57,7 +57,6 @@ import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
-import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -83,7 +82,7 @@ public class TestDistributedLogSplitting {
HBaseTestingUtility TEST_UTIL;
private void startCluster(int num_rs) throws Exception{
- ZKSplitLog.Counters.resetCounters();
+ SplitLogCounters.resetCounters();
LOG.info("Starting cluster");
conf = HBaseConfiguration.create();
conf.getLong("hbase.splitlog.max.resubmit", 0);
diff --git src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
index 14cdb90..59ad92c 100644
--- src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
+++ src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
-import org.apache.hadoop.hbase.executor.RegionTransitionData;
import org.apache.hadoop.hbase.master.AssignmentManager.RegionState;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
@@ -475,8 +474,9 @@ public class TestMasterFailover {
ZKAssign.createNodeOffline(zkw, region, serverName);
hrs.openRegion(region);
while (true) {
- RegionTransitionData rtd = ZKAssign.getData(zkw, region.getEncodedName());
- if (rtd != null && rtd.getEventType() == EventType.RS_ZK_REGION_OPENED) {
+ byte [] bytes = ZKAssign.getData(zkw, region.getEncodedName());
+ RegionTransition rt = RegionTransition.parseFrom(bytes);
+ if (rt != null && rt.getEventType().equals(EventType.RS_ZK_REGION_OPENED)) {
break;
}
Thread.sleep(100);
@@ -488,8 +488,9 @@ public class TestMasterFailover {
ZKAssign.createNodeOffline(zkw, region, serverName);
hrs.openRegion(region);
while (true) {
- RegionTransitionData rtd = ZKAssign.getData(zkw, region.getEncodedName());
- if (rtd != null && rtd.getEventType() == EventType.RS_ZK_REGION_OPENED) {
+ byte [] bytes = ZKAssign.getData(zkw, region.getEncodedName());
+ RegionTransition rt = RegionTransition.parseFrom(bytes);
+ if (rt != null && rt.getEventType().equals(EventType.RS_ZK_REGION_OPENED)) {
break;
}
Thread.sleep(100);
@@ -835,8 +836,9 @@ public class TestMasterFailover {
ZKAssign.createNodeOffline(zkw, region, deadServerName);
hrsDead.openRegion(region);
while (true) {
- RegionTransitionData rtd = ZKAssign.getData(zkw, region.getEncodedName());
- if (rtd != null && rtd.getEventType() == EventType.RS_ZK_REGION_OPENED) {
+ byte [] bytes = ZKAssign.getData(zkw, region.getEncodedName());
+ RegionTransition rt = RegionTransition.parseFrom(bytes);
+ if (rt != null && rt.getEventType().equals(EventType.RS_ZK_REGION_OPENED)) {
break;
}
Thread.sleep(100);
@@ -850,8 +852,9 @@ public class TestMasterFailover {
ZKAssign.createNodeOffline(zkw, region, deadServerName);
hrsDead.openRegion(region);
while (true) {
- RegionTransitionData rtd = ZKAssign.getData(zkw, region.getEncodedName());
- if (rtd != null && rtd.getEventType() == EventType.RS_ZK_REGION_OPENED) {
+ byte [] bytes = ZKAssign.getData(zkw, region.getEncodedName());
+ RegionTransition rt = RegionTransition.parseFrom(bytes);
+ if (rt != null && rt.getEventType().equals(EventType.RS_ZK_REGION_OPENED)) {
break;
}
Thread.sleep(100);
@@ -869,8 +872,9 @@ public class TestMasterFailover {
ZKAssign.createNodeOffline(zkw, region, deadServerName);
hrsDead.openRegion(region);
while (true) {
- RegionTransitionData rtd = ZKAssign.getData(zkw, region.getEncodedName());
- if (rtd != null && rtd.getEventType() == EventType.RS_ZK_REGION_OPENED) {
+ byte [] bytes = ZKAssign.getData(zkw, region.getEncodedName());
+ RegionTransition rt = RegionTransition.parseFrom(bytes);
+ if (rt != null && rt.getEventType().equals(EventType.RS_ZK_REGION_OPENED)) {
ZKAssign.deleteOpenedNode(zkw, region.getEncodedName());
break;
}
@@ -885,8 +889,9 @@ public class TestMasterFailover {
ZKAssign.createNodeOffline(zkw, region, deadServerName);
hrsDead.openRegion(region);
while (true) {
- RegionTransitionData rtd = ZKAssign.getData(zkw, region.getEncodedName());
- if (rtd != null && rtd.getEventType() == EventType.RS_ZK_REGION_OPENED) {
+ byte [] bytes = ZKAssign.getData(zkw, region.getEncodedName());
+ RegionTransition rt = RegionTransition.parseFrom(bytes);
+ if (rt != null && rt.getEventType().equals(EventType.RS_ZK_REGION_OPENED)) {
ZKAssign.deleteOpenedNode(zkw, region.getEncodedName());
break;
}
diff --git src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java
index f8029ba..d16d156 100644
--- src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java
+++ src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java
@@ -27,6 +27,7 @@ import java.net.UnknownHostException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.DeserializationException;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
@@ -227,10 +228,11 @@ public class TestMasterNoCluster {
* @throws IOException
* @throws KeeperException
* @throws InterruptedException
+ * @throws DeserializationException
*/
@Test
public void testCatalogDeploys()
- throws IOException, KeeperException, InterruptedException {
+ throws IOException, KeeperException, InterruptedException, DeserializationException {
final Configuration conf = TESTUTIL.getConfiguration();
final long now = System.currentTimeMillis();
// Name for our single mocked up regionserver.
diff --git src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
index 0f7d54e..9857aa4 100644
--- src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
+++ src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
@@ -19,27 +19,43 @@
*/
package org.apache.hadoop.hbase.master;
-import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*;
+import static org.apache.hadoop.hbase.SplitLogCounters.resetCounters;
+import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_get_data_nonode;
+import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_heartbeat;
+import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_log_split_batch_success;
+import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_queued;
+import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_result;
+import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_orphan_task_acquired;
+import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan;
+import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan_deleted;
+import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit;
+import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_dead_server_task;
+import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_failed;
+import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_threshold_reached;
+import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_unassigned;
+import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_task_deleted;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
-import static org.junit.Assert.*;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.SplitLogTask;
+import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.master.SplitLogManager.Task;
import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
import org.apache.hadoop.hbase.regionserver.TestMasterAddressManager.NodeCreationListener;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
-import org.apache.hadoop.hbase.zookeeper.ZKSplitLog.TaskState;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.log4j.Level;
@@ -57,6 +73,7 @@ import org.junit.experimental.categories.Category;
@Category(MediumTests.class)
public class TestSplitLogManager {
private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class);
+ private final ServerName DUMMY_MASTER = new ServerName("dummy-master,1,1");
static {
Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
}
@@ -180,16 +197,16 @@ public class TestSplitLogManager {
@Test
public void testTaskCreation() throws Exception {
LOG.info("TestTaskCreation - test the creation of a task in zk");
-
- slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+ slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null);
slm.finishInitialization();
TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1");
byte[] data = ZKUtil.getData(zkw, tasknode);
- LOG.info("Task node created " + new String(data));
- assertTrue(TaskState.TASK_UNASSIGNED.equals(data, "dummy-master"));
+ SplitLogTask slt = SplitLogTask.parseFrom(data);
+ LOG.info("Task node created " + slt.toString());
+ assertTrue(slt.isUnassigned(DUMMY_MASTER));
}
@Test
@@ -197,8 +214,8 @@ public class TestSplitLogManager {
LOG.info("TestOrphanTaskAcquisition");
String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
- zkw.getRecoverableZooKeeper().create(tasknode,
- TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE,
+ SplitLogTask slt = new SplitLogTask.Owned(DUMMY_MASTER);
+ zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
int to = 1000;
@@ -207,7 +224,7 @@ public class TestSplitLogManager {
to = to + 2 * 100;
- slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+ slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null);
slm.finishInitialization();
waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100);
Task task = slm.findOrCreateOrphanTask(tasknode);
@@ -229,12 +246,12 @@ public class TestSplitLogManager {
" startup");
String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
//create an unassigned orphan task
- zkw.getRecoverableZooKeeper().create(tasknode,
- TaskState.TASK_UNASSIGNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE,
+ SplitLogTask slt = new SplitLogTask.Unassigned(DUMMY_MASTER);
+ zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
int version = ZKUtil.checkExists(zkw, tasknode);
- slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+ slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null);
slm.finishInitialization();
waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100);
Task task = slm.findOrCreateOrphanTask(tasknode);
@@ -263,24 +280,29 @@ public class TestSplitLogManager {
to = to + 2 * 100;
conf.setInt("hbase.splitlog.max.resubmit", 2);
- slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+ slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null);
slm.finishInitialization();
TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1");
int version = ZKUtil.checkExists(zkw, tasknode);
-
- ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker1"));
+ final ServerName worker1 = new ServerName("worker1,1,1");
+ final ServerName worker2 = new ServerName("worker2,1,1");
+ final ServerName worker3 = new ServerName("worker3,1,1");
+ SplitLogTask slt = new SplitLogTask.Owned(worker1);
+ ZKUtil.setData(zkw, tasknode, slt.toByteArray());
waitForCounter(tot_mgr_heartbeat, 0, 1, 1000);
waitForCounter(tot_mgr_resubmit, 0, 1, to + EXTRA_TOLERANCE_MS);
int version1 = ZKUtil.checkExists(zkw, tasknode);
assertTrue(version1 > version);
- ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker2"));
+ slt = new SplitLogTask.Owned(worker2);
+ ZKUtil.setData(zkw, tasknode, slt.toByteArray());
waitForCounter(tot_mgr_heartbeat, 1, 2, 1000);
waitForCounter(tot_mgr_resubmit, 1, 2, to + 100);
int version2 = ZKUtil.checkExists(zkw, tasknode);
assertTrue(version2 > version1);
- ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker3"));
+ slt = new SplitLogTask.Owned(worker3);
+ ZKUtil.setData(zkw, tasknode, slt.toByteArray());
waitForCounter(tot_mgr_heartbeat, 1, 2, 1000);
waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + EXTRA_TOLERANCE_MS);
Thread.sleep(to + EXTRA_TOLERANCE_MS);
@@ -293,14 +315,15 @@ public class TestSplitLogManager {
conf.setInt("hbase.splitlog.manager.timeout", 1000);
conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
- slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+ slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null);
slm.finishInitialization();
TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1");
int version = ZKUtil.checkExists(zkw, tasknode);
-
- ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker1"));
+ final ServerName worker1 = new ServerName("worker1,1,1");
+ SplitLogTask slt = new SplitLogTask.Owned(worker1);
+ ZKUtil.setData(zkw, tasknode, slt.toByteArray());
waitForCounter(tot_mgr_heartbeat, 0, 1, 1000);
waitForCounter(new Expr() {
@Override
@@ -312,8 +335,8 @@ public class TestSplitLogManager {
int version1 = ZKUtil.checkExists(zkw, tasknode);
assertTrue(version1 > version);
byte[] taskstate = ZKUtil.getData(zkw, tasknode);
- assertTrue(Arrays.equals(TaskState.TASK_UNASSIGNED.get("dummy-master"),
- taskstate));
+ slt = SplitLogTask.parseFrom(taskstate);
+ assertTrue(slt.isUnassigned(DUMMY_MASTER));
waitForCounter(tot_mgr_rescan_deleted, 0, 1, 1000);
} else {
@@ -327,11 +350,13 @@ public class TestSplitLogManager {
public void testTaskDone() throws Exception {
LOG.info("TestTaskDone - cleanup task node once in DONE state");
- slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+ slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null);
slm.finishInitialization();
TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1");
- ZKUtil.setData(zkw, tasknode, TaskState.TASK_DONE.get("worker"));
+ final ServerName worker1 = new ServerName("worker1,1,1");
+ SplitLogTask slt = new SplitLogTask.Done(worker1);
+ ZKUtil.setData(zkw, tasknode, slt.toByteArray());
synchronized (batch) {
while (batch.installed != batch.done) {
batch.wait();
@@ -346,12 +371,14 @@ public class TestSplitLogManager {
LOG.info("TestTaskErr - cleanup task node once in ERR state");
conf.setInt("hbase.splitlog.max.resubmit", 0);
- slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+ slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null);
slm.finishInitialization();
TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1");
- ZKUtil.setData(zkw, tasknode, TaskState.TASK_ERR.get("worker"));
+ final ServerName worker1 = new ServerName("worker1,1,1");
+ SplitLogTask slt = new SplitLogTask.Err(worker1);
+ ZKUtil.setData(zkw, tasknode, slt.toByteArray());
synchronized (batch) {
while (batch.installed != batch.error) {
batch.wait();
@@ -359,18 +386,20 @@ public class TestSplitLogManager {
}
waitForCounter(tot_mgr_task_deleted, 0, 1, 1000);
assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
- conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLog.DEFAULT_MAX_RESUBMIT);
+ conf.setInt("hbase.splitlog.max.resubmit", SplitLogManager.DEFAULT_MAX_RESUBMIT);
}
@Test
public void testTaskResigned() throws Exception {
LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state");
- slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+ slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null);
slm.finishInitialization();
TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1");
- ZKUtil.setData(zkw, tasknode, TaskState.TASK_RESIGNED.get("worker"));
+ final ServerName worker1 = new ServerName("worker1,1,1");
+ SplitLogTask slt = new SplitLogTask.Resigned(worker1);
+ ZKUtil.setData(zkw, tasknode, slt.toByteArray());
int version = ZKUtil.checkExists(zkw, tasknode);
waitForCounter(tot_mgr_resubmit, 0, 1, 1000);
@@ -378,8 +407,8 @@ public class TestSplitLogManager {
assertTrue(version1 > version);
byte[] taskstate = ZKUtil.getData(zkw, tasknode);
- assertTrue(Arrays.equals(taskstate,
- TaskState.TASK_UNASSIGNED.get("dummy-master")));
+ slt = SplitLogTask.parseFrom(taskstate);
+ assertTrue(slt.isUnassigned(DUMMY_MASTER));
}
@Test
@@ -389,8 +418,9 @@ public class TestSplitLogManager {
// create an orphan task in OWNED state
String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1");
- zkw.getRecoverableZooKeeper().create(tasknode1,
- TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE,
+ final ServerName worker1 = new ServerName("worker1,1,1");
+ SplitLogTask slt = new SplitLogTask.Owned(worker1);
+ zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
int to = 1000;
@@ -399,7 +429,7 @@ public class TestSplitLogManager {
conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
- slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+ slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null);
slm.finishInitialization();
waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100);
@@ -411,8 +441,9 @@ public class TestSplitLogManager {
// keep updating the orphan owned node every to/2 seconds
for (int i = 0; i < (3 * to)/100; i++) {
Thread.sleep(100);
- ZKUtil.setData(zkw, tasknode1,
- TaskState.TASK_OWNED.get("dummy-worker"));
+ final ServerName worker2 = new ServerName("worker1,1,1");
+ slt = new SplitLogTask.Owned(worker2);
+ ZKUtil.setData(zkw, tasknode1, slt.toByteArray());
}
// since we have stopped heartbeating the owned node therefore it should
@@ -429,14 +460,15 @@ public class TestSplitLogManager {
LOG.info("testDeadWorker");
conf.setLong("hbase.splitlog.max.resubmit", 0);
- slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+ slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null);
slm.finishInitialization();
TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1");
int version = ZKUtil.checkExists(zkw, tasknode);
-
- ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker1"));
+ final ServerName worker1 = new ServerName("worker1,1,1");
+ SplitLogTask slt = new SplitLogTask.Owned(worker1);
+ ZKUtil.setData(zkw, tasknode, slt.toByteArray());
waitForCounter(tot_mgr_heartbeat, 0, 1, 1000);
slm.handleDeadWorker("worker1");
waitForCounter(tot_mgr_resubmit, 0, 1, 1000);
@@ -445,15 +477,15 @@ public class TestSplitLogManager {
int version1 = ZKUtil.checkExists(zkw, tasknode);
assertTrue(version1 > version);
byte[] taskstate = ZKUtil.getData(zkw, tasknode);
- assertTrue(Arrays.equals(TaskState.TASK_UNASSIGNED.get("dummy-master"),
- taskstate));
+ slt = SplitLogTask.parseFrom(taskstate);
+ assertTrue(slt.isUnassigned(DUMMY_MASTER));
return;
}
@Test
public void testEmptyLogDir() throws Exception {
LOG.info("testEmptyLogDir");
- slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+ slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null);
slm.finishInitialization();
FileSystem fs = TEST_UTIL.getTestFileSystem();
Path emptyLogDirPath = new Path(fs.getWorkingDirectory(),
@@ -467,7 +499,7 @@ public class TestSplitLogManager {
public void testVanishingTaskZNode() throws Exception {
LOG.info("testVanishingTaskZNode");
conf.setInt("hbase.splitlog.manager.unassigned.timeout", 0);
- slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+ slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null);
slm.finishInitialization();
FileSystem fs = TEST_UTIL.getTestFileSystem();
final Path logDir = new Path(fs.getWorkingDirectory(),
diff --git src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
index 26b9865..324b7a6 100644
--- src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
+++ src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
@@ -19,12 +19,6 @@
*/
package org.apache.hadoop.hbase.regionserver;
-import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.resetCounters;
-import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.tot_wkr_failed_to_grab_task_lost_race;
-import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.tot_wkr_failed_to_grab_task_owned;
-import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.tot_wkr_preempt_task;
-import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.tot_wkr_task_acquired;
-import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.tot_wkr_task_acquired_rescan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -35,10 +29,11 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MediumTests;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.SplitLogCounters;
+import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
-import org.apache.hadoop.hbase.zookeeper.ZKSplitLog.TaskState;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.log4j.Level;
@@ -53,6 +48,7 @@ import org.junit.experimental.categories.Category;
@Category(MediumTests.class)
public class TestSplitLogWorker {
private static final Log LOG = LogFactory.getLog(TestSplitLogWorker.class);
+ private final ServerName MANAGER = new ServerName("manager,1,1");
static {
Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
}
@@ -98,7 +94,7 @@ public class TestSplitLogWorker {
ZKUtil.createAndFailSilent(zkw, zkw.splitLogZNode);
assertTrue(ZKUtil.checkExists(zkw, zkw.splitLogZNode) != -1);
LOG.debug(zkw.splitLogZNode + " created");
- resetCounters();
+ SplitLogCounters.resetCounters();
}
@@ -129,19 +125,20 @@ public class TestSplitLogWorker {
@Test
public void testAcquireTaskAtStartup() throws Exception {
LOG.info("testAcquireTaskAtStartup");
- ZKSplitLog.Counters.resetCounters();
-
- zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tatas"),
- TaskState.TASK_UNASSIGNED.get("mgr"), Ids.OPEN_ACL_UNSAFE,
+ SplitLogCounters.resetCounters();
+ final String TATAS = "tatas";
+ final ServerName RS = new ServerName("rs,1,1");
+ zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS),
+ new SplitLogTask.Unassigned(new ServerName("mgr,1,1")).toByteArray(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
- SplitLogWorker slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(),
- "rs", neverEndingTask);
+ SplitLogWorker slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), RS, neverEndingTask);
slw.start();
try {
- waitForCounter(tot_wkr_task_acquired, 0, 1, 100);
- assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw,
- ZKSplitLog.getEncodedNodeName(zkw, "tatas")), "rs"));
+ waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 100);
+ byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS));
+ SplitLogTask slt = SplitLogTask.parseFrom(bytes);
+ assertTrue(slt.isOwned(RS));
} finally {
stopSplitLogWorker(slw);
}
@@ -161,28 +158,27 @@ public class TestSplitLogWorker {
@Test
public void testRaceForTask() throws Exception {
LOG.info("testRaceForTask");
- ZKSplitLog.Counters.resetCounters();
-
- zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "trft"),
- TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
+ SplitLogCounters.resetCounters();
+ final String TRFT = "trft";
+ final ServerName SVR1 = new ServerName("svr1,1,1");
+ final ServerName SVR2 = new ServerName("svr2,1,1");
+ zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TRFT),
+ new SplitLogTask.Unassigned(MANAGER).toByteArray(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
- SplitLogWorker slw1 = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(),
- "svr1", neverEndingTask);
- SplitLogWorker slw2 = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(),
- "svr2", neverEndingTask);
+ SplitLogWorker slw1 = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), SVR1, neverEndingTask);
+ SplitLogWorker slw2 = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), SVR2, neverEndingTask);
slw1.start();
slw2.start();
try {
- waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
+ waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 1000);
// Assert that either the tot_wkr_failed_to_grab_task_owned count was set of if
// not it, that we fell through to the next counter in line and it was set.
- assertTrue(waitForCounterBoolean(tot_wkr_failed_to_grab_task_owned, 0, 1, 1000) ||
- tot_wkr_failed_to_grab_task_lost_race.get() == 1);
- assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw,
- ZKSplitLog.getEncodedNodeName(zkw, "trft")), "svr1") ||
- TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw,
- ZKSplitLog.getEncodedNodeName(zkw, "trft")), "svr2"));
+ assertTrue(waitForCounterBoolean(SplitLogCounters.tot_wkr_failed_to_grab_task_owned, 0, 1, 1000) ||
+ SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.get() == 1);
+ byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TRFT));
+ SplitLogTask slt = SplitLogTask.parseFrom(bytes);
+ assertTrue(slt.isOwned(SVR1) || slt.isOwned(SVR2));
} finally {
stopSplitLogWorker(slw1);
stopSplitLogWorker(slw2);
@@ -192,28 +188,28 @@ public class TestSplitLogWorker {
@Test
public void testPreemptTask() throws Exception {
LOG.info("testPreemptTask");
- ZKSplitLog.Counters.resetCounters();
-
- SplitLogWorker slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(),
- "tpt_svr", neverEndingTask);
+ SplitLogCounters.resetCounters();
+ final ServerName SRV = new ServerName("tpt_svr,1,1");
+ final String PATH = ZKSplitLog.getEncodedNodeName(zkw, "tpt_task");
+ SplitLogWorker slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), SRV, neverEndingTask);
slw.start();
try {
Thread.yield(); // let the worker start
Thread.sleep(100);
// this time create a task node after starting the splitLogWorker
- zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"),
- TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
+ zkw.getRecoverableZooKeeper().create(PATH,
+ new SplitLogTask.Unassigned(MANAGER).toByteArray(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
- waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
+ waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 1000);
assertEquals(1, slw.taskReadySeq);
- assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw,
- ZKSplitLog.getEncodedNodeName(zkw, "tpt_task")), "tpt_svr"));
-
- ZKUtil.setData(zkw, ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"),
- TaskState.TASK_UNASSIGNED.get("manager"));
- waitForCounter(tot_wkr_preempt_task, 0, 1, 1000);
+ byte [] bytes = ZKUtil.getData(zkw, PATH);
+ SplitLogTask slt = SplitLogTask.parseFrom(bytes);
+ assertTrue(slt.isOwned(SRV));
+ slt = new SplitLogTask.Unassigned(MANAGER);
+ ZKUtil.setData(zkw, PATH, slt.toByteArray());
+ waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, 1000);
} finally {
stopSplitLogWorker(slw);
}
@@ -222,35 +218,37 @@ public class TestSplitLogWorker {
@Test
public void testMultipleTasks() throws Exception {
LOG.info("testMultipleTasks");
- ZKSplitLog.Counters.resetCounters();
- SplitLogWorker slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(),
- "tmt_svr", neverEndingTask);
+ SplitLogCounters.resetCounters();
+ final ServerName SRV = new ServerName("tmt_svr,1,1");
+ final String PATH1 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task");
+ SplitLogWorker slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), SRV, neverEndingTask);
slw.start();
try {
Thread.yield(); // let the worker start
Thread.sleep(100);
+ SplitLogTask unassignedManager = new SplitLogTask.Unassigned(MANAGER);
+ zkw.getRecoverableZooKeeper().create(PATH1, unassignedManager.toByteArray(),
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"),
- TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
-
- waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
+ waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 1000);
// now the worker is busy doing the above task
// create another task
- zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2"),
- TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
+ final String PATH2 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2");
+ zkw.getRecoverableZooKeeper().create(PATH2, unassignedManager.toByteArray(),
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// preempt the first task, have it owned by another worker
- ZKUtil.setData(zkw, ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"),
- TaskState.TASK_OWNED.get("another-worker"));
- waitForCounter(tot_wkr_preempt_task, 0, 1, 1000);
+ final ServerName anotherWorker = new ServerName("another-worker,1,1");
+ SplitLogTask slt = new SplitLogTask.Owned(anotherWorker);
+ ZKUtil.setData(zkw, PATH1, slt.toByteArray());
+ waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, 1000);
- waitForCounter(tot_wkr_task_acquired, 1, 2, 1000);
+ waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, 1000);
assertEquals(2, slw.taskReadySeq);
- assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw,
- ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2")), "tmt_svr"));
+ byte [] bytes = ZKUtil.getData(zkw, PATH2);
+ slt = SplitLogTask.parseFrom(bytes);
+ assertTrue(slt.isOwned(SRV));
} finally {
stopSplitLogWorker(slw);
}
@@ -259,38 +257,37 @@ public class TestSplitLogWorker {
@Test
public void testRescan() throws Exception {
LOG.info("testRescan");
- ZKSplitLog.Counters.resetCounters();
- slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(),
- "svr", neverEndingTask);
+ SplitLogCounters.resetCounters();
+ final ServerName SRV = new ServerName("svr,1,1");
+ slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), SRV, neverEndingTask);
slw.start();
Thread.yield(); // let the worker start
Thread.sleep(100);
String task = ZKSplitLog.getEncodedNodeName(zkw, "task");
- zkw.getRecoverableZooKeeper().create(task,
- TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
+ SplitLogTask slt = new SplitLogTask.Unassigned(MANAGER);
+ zkw.getRecoverableZooKeeper().create(task,slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
- waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
+ waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 1000);
// now the worker is busy doing the above task
// preempt the task, have it owned by another worker
- ZKUtil.setData(zkw, task, TaskState.TASK_UNASSIGNED.get("manager"));
- waitForCounter(tot_wkr_preempt_task, 0, 1, 1000);
+ ZKUtil.setData(zkw, task, slt.toByteArray());
+ waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, 1000);
// create a RESCAN node
String rescan = ZKSplitLog.getEncodedNodeName(zkw, "RESCAN");
- rescan = zkw.getRecoverableZooKeeper().create(rescan,
- TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
+ rescan = zkw.getRecoverableZooKeeper().create(rescan, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL);
- waitForCounter(tot_wkr_task_acquired, 1, 2, 1000);
+ waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, 1000);
// RESCAN node might not have been processed if the worker became busy
// with the above task. preempt the task again so that now the RESCAN
// node is processed
- ZKUtil.setData(zkw, task, TaskState.TASK_UNASSIGNED.get("manager"));
- waitForCounter(tot_wkr_preempt_task, 1, 2, 1000);
- waitForCounter(tot_wkr_task_acquired_rescan, 0, 1, 1000);
+ ZKUtil.setData(zkw, task, slt.toByteArray());
+ waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 1, 2, 1000);
+ waitForCounter(SplitLogCounters.tot_wkr_task_acquired_rescan, 0, 1, 1000);
List nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.splitLogZNode);
LOG.debug(nodes);
@@ -301,8 +298,8 @@ public class TestSplitLogWorker {
String name = ZKSplitLog.getEncodedNodeName(zkw, node);
String fn = ZKSplitLog.getFileName(name);
byte [] data = ZKUtil.getData(zkw, ZKUtil.joinZNode(zkw.splitLogZNode, fn));
- String datastr = Bytes.toString(data);
- assertTrue("data=" + datastr, TaskState.TASK_DONE.equals(data, "svr"));
+ slt = SplitLogTask.parseFrom(data);
+ assertTrue(slt.toString(), slt.isDone(SRV));
}
}
assertEquals(2, num);
@@ -311,5 +308,4 @@ public class TestSplitLogWorker {
@org.junit.Rule
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
-}
-
+}
\ No newline at end of file
diff --git src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
index 75b5aea..65fa948 100644
--- src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
+++ src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
-import org.apache.hadoop.hbase.executor.RegionTransitionData;
import org.apache.hadoop.hbase.master.handler.SplitRegionHandler;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
@@ -98,9 +97,10 @@ public class TestSplitTransactionOnCluster {
* @throws InterruptedException
* @throws NodeExistsException
* @throws KeeperException
+ * @throws DeserializationException
*/
@Test (timeout = 300000) public void testRSSplitEphemeralsDisappearButDaughtersAreOnlinedAfterShutdownHandling()
- throws IOException, InterruptedException, NodeExistsException, KeeperException {
+ throws IOException, InterruptedException, NodeExistsException, KeeperException, DeserializationException {
final byte [] tableName =
Bytes.toBytes("ephemeral");
@@ -137,12 +137,12 @@ public class TestSplitTransactionOnCluster {
Stat stats =
TESTING_UTIL.getZooKeeperWatcher().getRecoverableZooKeeper().exists(path, false);
LOG.info("EPHEMERAL NODE BEFORE SERVER ABORT, path=" + path + ", stats=" + stats);
- RegionTransitionData rtd =
- ZKAssign.getData(TESTING_UTIL.getZooKeeperWatcher(),
- hri.getEncodedName());
+ RegionTransition rt =
+ RegionTransition.parseFrom(ZKAssign.getData(TESTING_UTIL.getZooKeeperWatcher(),
+ hri.getEncodedName()));
// State could be SPLIT or SPLITTING.
- assertTrue(rtd.getEventType().equals(EventType.RS_ZK_REGION_SPLIT) ||
- rtd.getEventType().equals(EventType.RS_ZK_REGION_SPLITTING));
+ assertTrue(rt.getEventType().equals(EventType.RS_ZK_REGION_SPLIT) ||
+ rt.getEventType().equals(EventType.RS_ZK_REGION_SPLITTING));
// Now crash the server
cluster.abortRegionServer(tableRegionIndex);
waitUntilRegionServerDead();
diff --git src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestCloseRegionHandler.java src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestCloseRegionHandler.java
index 07f8fc4..567ef10 100644
--- src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestCloseRegionHandler.java
+++ src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestCloseRegionHandler.java
@@ -25,14 +25,15 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.DeserializationException;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.RegionTransition;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
-import org.apache.hadoop.hbase.executor.RegionTransitionData;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.util.Bytes;
@@ -133,9 +134,10 @@ public class TestCloseRegionHandler {
* @throws IOException
* @throws NodeExistsException
* @throws KeeperException
+ * @throws DeserializationException
*/
@Test public void testZKClosingNodeVersionMismatch()
- throws IOException, NodeExistsException, KeeperException {
+ throws IOException, NodeExistsException, KeeperException, DeserializationException {
final Server server = new MockServer(HTU);
final MockRegionServerServices rss = new MockRegionServerServices();
rss.setFileSystem(HTU.getTestFileSystem());
@@ -160,9 +162,9 @@ public class TestCloseRegionHandler {
handler.process();
// Handler should remain in M_ZK_REGION_CLOSING
- RegionTransitionData data =
- ZKAssign.getData(server.getZooKeeper(), hri.getEncodedName());
- assertTrue(EventType.M_ZK_REGION_CLOSING == data.getEventType());
+ RegionTransition rt =
+ RegionTransition.parseFrom(ZKAssign.getData(server.getZooKeeper(), hri.getEncodedName()));
+ assertTrue(rt.getEventType().equals(EventType.M_ZK_REGION_CLOSING ));
}
/**
@@ -170,9 +172,10 @@ public class TestCloseRegionHandler {
* @throws IOException
* @throws NodeExistsException
* @throws KeeperException
+ * @throws DeserializationException
*/
@Test public void testCloseRegion()
- throws IOException, NodeExistsException, KeeperException {
+ throws IOException, NodeExistsException, KeeperException, DeserializationException {
final Server server = new MockServer(HTU);
final MockRegionServerServices rss = new MockRegionServerServices();
rss.setFileSystem(HTU.getTestFileSystem());
@@ -196,26 +199,23 @@ public class TestCloseRegionHandler {
versionOfClosingNode);
handler.process();
// Handler should have transitioned it to RS_ZK_REGION_CLOSED
- RegionTransitionData data =
- ZKAssign.getData(server.getZooKeeper(), hri.getEncodedName());
- assertTrue(EventType.RS_ZK_REGION_CLOSED == data.getEventType());
+ RegionTransition rt = RegionTransition.parseFrom(
+ ZKAssign.getData(server.getZooKeeper(), hri.getEncodedName()));
+ assertTrue(rt.getEventType().equals(EventType.RS_ZK_REGION_CLOSED));
}
+
private void OpenRegion(Server server, RegionServerServices rss,
- HTableDescriptor htd, HRegionInfo hri)
- throws IOException, NodeExistsException, KeeperException {
- // Create it OFFLINE node, which is what Master set before sending OPEN RPC
- ZKAssign.createNodeOffline(server.getZooKeeper(), hri,
- server.getServerName());
- OpenRegionHandler openHandler = new OpenRegionHandler(server, rss, hri,
- htd);
- openHandler.process();
- RegionTransitionData data =
- ZKAssign.getData(server.getZooKeeper(), hri.getEncodedName());
-
- // delete the node, which is what Master do after the region is opened
- ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(),
- EventType.RS_ZK_REGION_OPENED);
- }
+ HTableDescriptor htd, HRegionInfo hri)
+ throws IOException, NodeExistsException, KeeperException, DeserializationException {
+ // Create it OFFLINE node, which is what Master set before sending OPEN RPC
+ ZKAssign.createNodeOffline(server.getZooKeeper(), hri, server.getServerName());
+ OpenRegionHandler openHandler = new OpenRegionHandler(server, rss, hri, htd);
+ openHandler.process();
+ // This parse is not used?
+ RegionTransition.parseFrom(ZKAssign.getData(server.getZooKeeper(), hri.getEncodedName()));
+ // delete the node, which is what Master do after the region is opened
+ ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(), EventType.RS_ZK_REGION_OPENED);
+ }
@org.junit.Rule
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
diff --git src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java
index 55a8c4a..0e7cfee 100644
--- src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java
+++ src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java
@@ -26,7 +26,6 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.*;
-import org.apache.hadoop.hbase.executor.RegionTransitionData;
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
@@ -148,9 +147,9 @@ public class TestOpenRegionHandler {
handler.process();
// Handler should have transitioned it to FAILED_OPEN
- RegionTransitionData data =
- ZKAssign.getData(server.getZooKeeper(), TEST_HRI.getEncodedName());
- assertEquals(EventType.RS_ZK_REGION_FAILED_OPEN, data.getEventType());
+ RegionTransition rt = RegionTransition.parseFrom(
+ ZKAssign.getData(server.getZooKeeper(), TEST_HRI.getEncodedName()));
+ assertEquals(EventType.RS_ZK_REGION_FAILED_OPEN, rt.getEventType());
}
@Test
@@ -173,9 +172,9 @@ public class TestOpenRegionHandler {
handler.process();
// Handler should have transitioned it to FAILED_OPEN
- RegionTransitionData data =
- ZKAssign.getData(server.getZooKeeper(), TEST_HRI.getEncodedName());
- assertEquals(EventType.RS_ZK_REGION_FAILED_OPEN, data.getEventType());
+ RegionTransition rt = RegionTransition.parseFrom(
+ ZKAssign.getData(server.getZooKeeper(), TEST_HRI.getEncodedName()));
+ assertEquals(EventType.RS_ZK_REGION_FAILED_OPEN, rt.getEventType());
}
diff --git src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java
index 4314572..d081968 100644
--- src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java
+++ src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.RegionTransition;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AdminProtocol;
import org.apache.hadoop.hbase.client.Delete;
@@ -59,7 +60,6 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
-import org.apache.hadoop.hbase.executor.RegionTransitionData;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
@@ -920,9 +920,9 @@ public class TestHBaseFsck {
int iTimes = 0;
while (true) {
- RegionTransitionData rtd = ZKAssign.getData(zkw,
- region.getEncodedName());
- if (rtd != null && rtd.getEventType() == EventType.RS_ZK_REGION_OPENED) {
+ RegionTransition rt = RegionTransition.parseFrom(ZKAssign.getData(zkw,
+ region.getEncodedName()));
+ if (rt != null && rt.getEventType() == EventType.RS_ZK_REGION_OPENED) {
break;
}
Thread.sleep(100);