Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (revision 1165482) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (working copy) @@ -765,7 +765,7 @@ try { Map> deleteMap = new HashMap>(); deleteMap.put(family, kvs); - region.delete(deleteMap, true); + region.delete(deleteMap, HConstants.DEFAULT_CLUSTER_ID, true); } catch (Exception e) { assertTrue("Family " +new String(family)+ " does not exist", false); } @@ -776,7 +776,7 @@ try { Map> deleteMap = new HashMap>(); deleteMap.put(family, kvs); - region.delete(deleteMap, true); + region.delete(deleteMap, HConstants.DEFAULT_CLUSTER_ID, true); } catch (Exception e) { ok = true; } @@ -1042,7 +1042,7 @@ Map> deleteMap = new HashMap>(); deleteMap.put(fam1, kvs); - region.delete(deleteMap, true); + region.delete(deleteMap, HConstants.DEFAULT_CLUSTER_ID, true); // extract the key values out the memstore: // This is kinda hacky, but better than nothing... Index: src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java (revision 0) @@ -0,0 +1,336 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestMasterReplication { + + private static final Log LOG = LogFactory.getLog(TestReplication.class); + + private static Configuration conf1; + private static Configuration conf2; + + private static ReplicationAdmin admin1; + private static ReplicationAdmin admin2; + private static String clusterKey2; + private static String clusterKey1; + + private static HTable htable1; + private static HTable htable2; + + private static HBaseTestingUtility utility1; + private static HBaseTestingUtility utility2; + private static final int NB_ROWS_IN_BATCH = 100; + private static final int NB_ROWS_IN_BIG_BATCH = + NB_ROWS_IN_BATCH * 10; + private static final long SLEEP_TIME = 500; + private static final int NB_RETRIES = 10; + + private static final byte[] tableName = Bytes.toBytes("test"); + private static final byte[] famName = Bytes.toBytes("f"); + private static final byte[] row = Bytes.toBytes("row"); + private static final byte[] row1 = Bytes.toBytes("row1"); + private static final byte[] noRepfamName = Bytes.toBytes("norep"); + + private static final byte[] reset = Bytes.toBytes("reset"); + private static final byte[] count = Bytes.toBytes("count"); + private static final byte[] put = Bytes.toBytes("put"); + private static final byte[] delete = Bytes.toBytes("delete"); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf1 = HBaseConfiguration.create(); + conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); + // smaller block size and capacity to trigger more operations + // and test them + conf1.setInt("hbase.regionserver.hlog.blocksize", 1024*20); + conf1.setInt("replication.source.size.capacity", 1024); + conf1.setLong("replication.source.sleepforretries", 100); + conf1.setInt("hbase.regionserver.maxlogs", 10); + conf1.setLong("hbase.master.logcleaner.ttl", 10); + conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); + conf1.setBoolean("dfs.support.append", true); + conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); + conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, + "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter"); + + utility1 = new HBaseTestingUtility(conf1); + utility1.startMiniZKCluster(); + MiniZooKeeperCluster miniZK = utility1.getZkCluster(); + new ZooKeeperWatcher(conf1, "cluster1", null, true); + admin1 = new ReplicationAdmin(conf1); + LOG.info("Setup first Zk"); + + conf2 = new Configuration(conf1); + conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); + + utility2 = new HBaseTestingUtility(conf2); + utility2.setZkCluster(miniZK); + new ZooKeeperWatcher(conf2, "cluster2", null, true); + + admin2 = new ReplicationAdmin(conf2); + + clusterKey1 = conf1.get(HConstants.ZOOKEEPER_QUORUM)+":" + + conf1.get("hbase.zookeeper.property.clientPort")+":/1"; + + clusterKey2 = conf2.get(HConstants.ZOOKEEPER_QUORUM)+":" + + conf2.get("hbase.zookeeper.property.clientPort")+":/2"; + + // set M-M + admin1.addPeer("2", clusterKey2); + admin2.addPeer("2", clusterKey1); + + LOG.info("Setup second Zk"); + utility1.startMiniCluster(2); + utility2.startMiniCluster(2); + HTableDescriptor table = new HTableDescriptor(tableName); + HColumnDescriptor fam = new HColumnDescriptor(famName); + fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); + table.addFamily(fam); + fam = new HColumnDescriptor(noRepfamName); + table.addFamily(fam); + HBaseAdmin admin1 = new HBaseAdmin(conf1); + HBaseAdmin admin2 = new HBaseAdmin(conf2); + admin1.createTable(table); + admin2.createTable(table); + htable1 = new HTable(conf1, tableName); + htable1.setWriteBufferSize(1024); + htable2 = new HTable(conf2, tableName); + htable2.setWriteBufferSize(1024); + setIsReplication(true); + } + + private static void setIsReplication(boolean rep) throws Exception { + LOG.info("Set rep " + rep); + admin1.setReplicating(rep); + admin2.setReplicating(rep); + Thread.sleep(SLEEP_TIME); + } + + @Before + public void setUp() throws Exception { + + // Starting and stopping replication can make us miss new logs, + // rolling like this makes sure the most recent one gets added to the queue + for ( JVMClusterUtil.RegionServerThread r : + utility1.getHBaseCluster().getRegionServerThreads()) { + r.getRegionServer().getWAL().rollWriter(); + } + utility1.truncateTable(tableName); + // truncating the table will send one Delete per row to the slave cluster + // in an async fashion, which is why we cannot just call truncateTable on + // utility2 since late writes could make it to the slave in some way. + // Instead, we truncate the first table and wait for all the Deletes to + // make it to the slave. + Scan scan = new Scan(); + int lastCount = 0; + for (int i = 0; i < NB_RETRIES; i++) { + if (i==NB_RETRIES-1) { + fail("Waited too much time for truncate"); + } + ResultScanner scanner = htable2.getScanner(scan); + Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH); + scanner.close(); + if (res.length != 0) { + if (res.length < lastCount) { + i--; // Don't increment timeout if we make progress + } + lastCount = res.length; + LOG.info("Still got " + res.length + " rows"); + Thread.sleep(SLEEP_TIME); + } else { + break; + } + } + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + utility2.shutdownMiniCluster(); + utility1.shutdownMiniCluster(); + } + + /** + * Add a row to a table in each cluster, check it's replicated, + * delete it, check's gone + * Also check the puts and deletes are not replicated back to + * the originating cluster. + */ + @Test(timeout=300000) + public void testSimplePutDelete() throws Exception { + /* + * needs HBASE-4331 + resetCount(htable1); + resetCount(htable2); + */ + // add rows to both clusters, + // make sure they are both replication + putAndWait(row, famName, htable1, htable2); + putAndWait(row1, famName, htable2, htable1); + + // make sure "row" did not get replicated back. + assertEquals("Puts were replicated back ", 2, getCount(htable1, put)); + + // delete "row" and wait + deleteAndWait(row, htable1, htable2); + + // make the 2nd cluster replicated back + assertEquals("Puts were replicated back ", 2, getCount(htable2, put)); + + deleteAndWait(row1, htable2, htable1); + + assertEquals("Deletes were replicated back ", 2, getCount(htable1, delete)); + } + + private void resetCount(HTable t) throws IOException + { + Put p = new Put(row); + p.add(reset,reset,new byte[]{}); + t.put(p); + } + + private int getCount(HTable t, byte[] type) throws IOException + { + Get test = new Get(row); + test.setAttribute("count", new byte[]{}); + Result res = t.get(test); + return Bytes.toInt(res.getValue(count, type)); + } + + private void deleteAndWait(byte[] row, HTable source, HTable target) + throws Exception { + Delete del = new Delete(row); + source.delete(del); + + Get get = new Get(row); + for (int i = 0; i < NB_RETRIES; i++) { + if (i==NB_RETRIES-1) { + fail("Waited too much time for del replication"); + } + Result res = target.get(get); + if (res.size() >= 1) { + LOG.info("Row not deleted"); + Thread.sleep(SLEEP_TIME); + } else { + break; + } + } + } + + private void putAndWait(byte[] row, byte[] fam, HTable source, HTable target) + throws Exception { + Put put = new Put(row); + put.add(fam, row, row); + source.put(put); + + Get get = new Get(row); + for (int i = 0; i < NB_RETRIES; i++) { + if (i==NB_RETRIES-1) { + fail("Waited too much time for put replication"); + } + Result res = target.get(get); + if (res.size() == 0) { + LOG.info("Row not available"); + Thread.sleep(SLEEP_TIME); + } else { + assertArrayEquals(res.value(), row); + break; + } + } + } + + /** + * Use a coprocessor to count puts and deletes. + * as KVs would be replicated back with the same timestamp + * there is otherwise no way to count them. + */ + public static class CoprocessorCounter extends BaseRegionObserver { + private int nCount = 0; + private int nDelete = 0; + + @Override + public void prePut(final ObserverContext e, + final Map> familyMap, final boolean writeToWAL) + throws IOException { + if (familyMap.containsKey(reset)) { + nCount = 0; + nDelete = 0; + e.bypass(); + } else { + nCount++; + } + } + @Override + public void postDelete(final ObserverContext c, + final Map> familyMap, final boolean writeToWAL) + throws IOException { + nDelete++; + } + @Override + public void preGet(final ObserverContext c, + final Get get, final List result) throws IOException { + if (get.getAttribute("count") != null) { + result.clear(); + // order is important! + result.add(new KeyValue(count, count, delete, Bytes.toBytes(nDelete))); + result.add(new KeyValue(count, count, put, Bytes.toBytes(nCount))); + c.bypass(); + } + } + } +} Index: src/main/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/HConstants.java (revision 1165482) +++ src/main/java/org/apache/hadoop/hbase/HConstants.java (working copy) @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.UUID; import java.util.regex.Pattern; /** @@ -203,6 +204,12 @@ /** Configuration key storing the cluster ID */ public static final String CLUSTER_ID = "hbase.cluster.id"; + /** + * Attribute used in Puts and Gets to indicate the originating + * cluster. + */ + public static final String CLUSTER_ID_ATTR = "_c.id_"; + // Always store the location of the root table's HRegion. // This HRegion is never split. @@ -364,7 +371,7 @@ * Default cluster ID, cannot be used to identify a cluster so a key with * this value means it wasn't meant for replication. */ - public static final byte DEFAULT_CLUSTER_ID = 0; + public static final UUID DEFAULT_CLUSTER_ID = new UUID(0L,0L); /** * Parameter name for maximum number of bytes returned when calling a Index: src/main/java/org/apache/hadoop/hbase/client/Delete.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/Delete.java (revision 1165482) +++ src/main/java/org/apache/hadoop/hbase/client/Delete.java (working copy) @@ -37,6 +37,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; +import java.util.UUID; /** * Used to perform Delete operations on a single row. @@ -513,4 +514,26 @@ public void setWriteToWAL(boolean write) { this.writeToWAL = write; } + + /** + * Set the replication custer id. + * @param clusterId + */ + public void setClusterId(UUID clusterId) { + byte[] val = new byte[2*Bytes.SIZEOF_LONG]; + Bytes.putLong(val, 0, clusterId.getMostSignificantBits()); + Bytes.putLong(val, Bytes.SIZEOF_LONG, clusterId.getLeastSignificantBits()); + setAttribute(HConstants.CLUSTER_ID_ATTR, val); + } + + /** + * @return The replication cluster id. + */ + public UUID getClusterId() { + byte[] attr = getAttribute(HConstants.CLUSTER_ID_ATTR); + if (attr == null) { + return HConstants.DEFAULT_CLUSTER_ID; + } + return new UUID(Bytes.toLong(attr,0), Bytes.toLong(attr, Bytes.SIZEOF_LONG)); + } } Index: src/main/java/org/apache/hadoop/hbase/client/Put.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/Put.java (revision 1165482) +++ src/main/java/org/apache/hadoop/hbase/client/Put.java (working copy) @@ -40,6 +40,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; +import java.util.UUID; /** @@ -656,4 +657,26 @@ byte [][] parts = KeyValue.parseColumn(column); return add(parts[0], parts[1], ts, value); } + + /** + * Set the replication custer id. + * @param clusterId + */ + public void setClusterId(UUID clusterId) { + byte[] val = new byte[2*Bytes.SIZEOF_LONG]; + Bytes.putLong(val, 0, clusterId.getMostSignificantBits()); + Bytes.putLong(val, Bytes.SIZEOF_LONG, clusterId.getLeastSignificantBits()); + setAttribute(HConstants.CLUSTER_ID_ATTR, val); + } + + /** + * @return The replication cluster id. + */ + public UUID getClusterId() { + byte[] attr = getAttribute(HConstants.CLUSTER_ID_ATTR); + if (attr == null) { + return HConstants.DEFAULT_CLUSTER_ID; + } + return new UUID(Bytes.toLong(attr,0), Bytes.toLong(attr, Bytes.SIZEOF_LONG)); + } } Index: src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (revision 1165482) +++ src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (working copy) @@ -67,7 +67,6 @@ public class ReplicationAdmin implements Closeable { private final ReplicationZookeeper replicationZk; - private final Configuration configuration; private final HConnection connection; /** @@ -81,7 +80,6 @@ throw new RuntimeException("hbase.replication isn't true, please " + "enable it in order to use replication"); } - this.configuration = conf; this.connection = HConnectionManager.getConnection(conf); ZooKeeperWatcher zkw = this.connection.getZooKeeperWatcher(); try { Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java (revision 1165482) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java (working copy) @@ -25,10 +25,12 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.UUID; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableUtils; /** * A Key for an entry in the change log. @@ -41,6 +43,10 @@ * associated row. */ public class HLogKey implements WritableComparable { + // before HBASE-2195 the cluster id byte was always 0. + // to be extra sure using a negative numbering scheme. + private static final int VERSION = -1; + // The encoded region name. private byte [] encodedRegionName; private byte [] tablename; @@ -48,7 +54,7 @@ // Time at which this edit was written. private long writeTime; - private byte clusterId; + private UUID clusterId; /** Writable Consructor -- Do not use. */ public HLogKey() { @@ -105,7 +111,7 @@ * Get the id of the original cluster * @return Cluster id. */ - public byte getClusterId() { + public UUID getClusterId() { return clusterId; } @@ -113,7 +119,7 @@ * Set the cluster id of this key * @param clusterId */ - public void setClusterId(byte clusterId) { + public void setClusterId(UUID clusterId) { this.clusterId = clusterId; } @@ -154,7 +160,7 @@ int result = Bytes.hashCode(this.encodedRegionName); result ^= this.logSeqNum; result ^= this.writeTime; - result ^= this.clusterId; + result ^= this.clusterId.hashCode(); return result; } @@ -174,6 +180,7 @@ } } } + // why isn't cluster id accounted for? return result; } @@ -204,22 +211,47 @@ } public void write(DataOutput out) throws IOException { + WritableUtils.writeVInt(out, VERSION); Bytes.writeByteArray(out, this.encodedRegionName); Bytes.writeByteArray(out, this.tablename); out.writeLong(this.logSeqNum); out.writeLong(this.writeTime); - out.writeByte(this.clusterId); + // avoid storing 16 bytes when replication is not enabled + if (this.clusterId == HConstants.DEFAULT_CLUSTER_ID) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeLong(this.clusterId.getMostSignificantBits()); + out.writeLong(this.clusterId.getLeastSignificantBits()); + } } public void readFields(DataInput in) throws IOException { - this.encodedRegionName = Bytes.readByteArray(in); + int version = 0; + // @see Bytes#readByteArray(DataInput) + int len = WritableUtils.readVInt(in); + if (len < 0) { + // what we just read was the version + version = len; + len = WritableUtils.readVInt(in); + } + this.encodedRegionName = new byte[len]; + in.readFully(this.encodedRegionName); this.tablename = Bytes.readByteArray(in); this.logSeqNum = in.readLong(); this.writeTime = in.readLong(); - try { - this.clusterId = in.readByte(); - } catch(EOFException e) { - // Means it's an old key, just continue + this.clusterId = HConstants.DEFAULT_CLUSTER_ID; + if (version < 0) { + if (in.readBoolean()) { + this.clusterId = new UUID(in.readLong(), in.readLong()); + } + } else { + try { + // dummy read (former byte cluster id) + in.readByte(); + } catch(EOFException e) { + // Means it's a very old key, just continue + } } } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (revision 1165482) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (working copy) @@ -37,6 +37,7 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; +import java.util.UUID; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; @@ -970,35 +971,66 @@ public void append(HRegionInfo info, byte [] tableName, WALEdit edits, final long now, HTableDescriptor htd) throws IOException { - if (edits.isEmpty()) return; - if (this.closed) { - throw new IOException("Cannot append; log is closed"); - } - synchronized (this.updateLock) { - long seqNum = obtainSeqNum(); - // The 'lastSeqWritten' map holds the sequence number of the oldest - // write for each region (i.e. the first edit added to the particular - // memstore). . When the cache is flushed, the entry for the - // region being flushed is removed if the sequence number of the flush - // is greater than or equal to the value in lastSeqWritten. - // Use encoded name. Its shorter, guaranteed unique and a subset of - // actual name. - byte [] hriKey = info.getEncodedNameAsBytes(); - this.lastSeqWritten.putIfAbsent(hriKey, seqNum); - HLogKey logKey = makeKey(hriKey, tableName, seqNum, now); - doWrite(info, logKey, edits, htd); - this.numEntries.incrementAndGet(); - } - // Sync if catalog region, and if not then check if that table supports - // deferred log flushing - if (info.isMetaRegion() || - !htd.isDeferredLogFlush()) { - // sync txn to file system - this.sync(); - } + append(info, tableName, edits, HConstants.DEFAULT_CLUSTER_ID, now, htd); } /** + * Append a set of edits to the log. Log edits are keyed by (encoded) + * regionName, rowname, and log-sequence-id. + * + * Later, if we sort by these keys, we obtain all the relevant edits for a + * given key-range of the HRegion (TODO). Any edits that do not have a + * matching COMPLETE_CACHEFLUSH message can be discarded. + * + *

+ * Logs cannot be restarted once closed, or once the HLog process dies. Each + * time the HLog starts, it must create a new log. This means that other + * systems should process the log appropriately upon each startup (and prior + * to initializing HLog). + * + * synchronized prevents appends during the completion of a cache flush or for + * the duration of a log roll. + * + * @param info + * @param tableName + * @param edits + * @param clusterId The originating clusterId for this edit (for replication) + * @param now + * @throws IOException + */ + public void append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId, + final long now, HTableDescriptor htd) + throws IOException { + if (edits.isEmpty()) return; + if (this.closed) { + throw new IOException("Cannot append; log is closed"); + } + synchronized (this.updateLock) { + long seqNum = obtainSeqNum(); + // The 'lastSeqWritten' map holds the sequence number of the oldest + // write for each region (i.e. the first edit added to the particular + // memstore). . When the cache is flushed, the entry for the + // region being flushed is removed if the sequence number of the flush + // is greater than or equal to the value in lastSeqWritten. + // Use encoded name. Its shorter, guaranteed unique and a subset of + // actual name. + byte [] hriKey = info.getEncodedNameAsBytes(); + this.lastSeqWritten.putIfAbsent(hriKey, seqNum); + HLogKey logKey = makeKey(hriKey, tableName, seqNum, now); + logKey.setClusterId(clusterId); + doWrite(info, logKey, edits, htd); + this.numEntries.incrementAndGet(); + } + // Sync if catalog region, and if not then check if that table supports + // deferred log flushing + if (info.isMetaRegion() || + !htd.isDeferredLogFlush()) { + // sync txn to file system + this.sync(); + } + } + + /** * This thread is responsible to call syncFs and buffer up the writers while * it happens. */ Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1165482) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -39,6 +39,7 @@ import java.util.NavigableSet; import java.util.Random; import java.util.TreeMap; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; @@ -1404,7 +1405,7 @@ try { // All edits for the given row (across all column families) must happen atomically. prepareDelete(delete); - delete(delete.getFamilyMap(), writeToWAL); + delete(delete.getFamilyMap(), delete.getClusterId(), writeToWAL); } finally { if(lockid == null) releaseRowLock(lid); } @@ -1413,14 +1414,13 @@ } } - /** * @param familyMap map of family to edits for the given family. * @param writeToWAL * @throws IOException */ - public void delete(Map> familyMap, boolean writeToWAL) - throws IOException { + public void delete(Map> familyMap, UUID clusterId, + boolean writeToWAL) throws IOException { /* Run coprocessor pre hook outside of locks to avoid deadlock */ if (coprocessorHost != null) { if (coprocessorHost.preDelete(familyMap, writeToWAL)) { @@ -1490,7 +1490,7 @@ WALEdit walEdit = new WALEdit(); addFamilyMapToWALEdit(familyMap, walEdit); this.log.append(regionInfo, this.htableDescriptor.getName(), - walEdit, now, this.htableDescriptor); + walEdit, clusterId, now, this.htableDescriptor); } // Now make changes to the memstore. @@ -1565,7 +1565,7 @@ try { // All edits for the given row (across all column families) must happen atomically. // Coprocessor interception happens in put(Map,boolean) - put(put.getFamilyMap(), writeToWAL); + put(put.getFamilyMap(), put.getClusterId(), writeToWAL); } finally { if(lockid == null) releaseRowLock(lid); } @@ -1760,8 +1760,9 @@ } // Append the edit to WAL + Put first = batchOp.operations[firstIndex].getFirst(); this.log.append(regionInfo, this.htableDescriptor.getName(), - walEdit, now, this.htableDescriptor); + walEdit, first.getClusterId(), now, this.htableDescriptor); // ------------------------------------ // STEP 4. Write back to memstore @@ -1893,11 +1894,11 @@ if (matches) { // All edits for the given row (across all column families) must happen atomically. if (isPut) { - put(((Put)w).getFamilyMap(), writeToWAL); + put(((Put)w).getFamilyMap(), HConstants.DEFAULT_CLUSTER_ID, writeToWAL); } else { Delete d = (Delete)w; prepareDelete(d); - delete(d.getFamilyMap(), writeToWAL); + delete(d.getFamilyMap(), HConstants.DEFAULT_CLUSTER_ID, writeToWAL); } return true; } @@ -1988,7 +1989,7 @@ familyMap = new HashMap>(); familyMap.put(family, edits); - this.put(familyMap, true); + this.put(familyMap, HConstants.DEFAULT_CLUSTER_ID, true); } /** @@ -1998,8 +1999,8 @@ * @param writeToWAL if true, then we should write to the log * @throws IOException */ - private void put(Map> familyMap, boolean writeToWAL) - throws IOException { + private void put(Map> familyMap, UUID clusterId, + boolean writeToWAL) throws IOException { /* run pre put hook outside of lock to avoid deadlock */ if (coprocessorHost != null) { if (coprocessorHost.prePut(familyMap, writeToWAL)) { @@ -2024,7 +2025,7 @@ WALEdit walEdit = new WALEdit(); addFamilyMapToWALEdit(familyMap, walEdit); this.log.append(regionInfo, this.htableDescriptor.getName(), - walEdit, now, this.htableDescriptor); + walEdit, clusterId, now, this.htableDescriptor); } long addedSize = applyFamilyMapToMemstore(familyMap); Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (revision 1165482) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (working copy) @@ -30,6 +30,7 @@ import java.util.NavigableMap; import java.util.Random; import java.util.Set; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; @@ -55,6 +56,7 @@ import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.zookeeper.ClusterId; import org.apache.hadoop.ipc.RemoteException; import org.apache.zookeeper.KeeperException; @@ -88,7 +90,7 @@ // should we replicate or not? private AtomicBoolean replicating; // id of the peer cluster this source replicates to - private String peerClusterId; + private String peerId; // The manager of all sources to which we ping back our progress private ReplicationSourceManager manager; // Should we stop everything? @@ -109,7 +111,9 @@ private volatile Path currentPath; private FileSystem fs; // id of this cluster - private byte clusterId; + private UUID clusterId; + // id of the other cluster + private UUID peerClusterId; // total number of edits we replicated private long totalReplicatedEdits = 0; // The znode we currently play with @@ -176,9 +180,15 @@ this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); this.fs = fs; - this.clusterId = Byte.valueOf(zkHelper.getClusterId()); this.metrics = new ReplicationSourceMetrics(peerClusterZnode); + try { + this.clusterId = UUID.fromString(ClusterId.readClusterIdZNode(zkHelper + .getZookeeperWatcher())); + } catch (KeeperException ke) { + throw new IOException("Could not read cluster id", ke); + } + // Finally look if this is a recovered queue this.checkIfQueueRecovered(peerClusterZnode); } @@ -188,7 +198,7 @@ private void checkIfQueueRecovered(String peerClusterZnode) { String[] parts = peerClusterZnode.split("-"); this.queueRecovered = parts.length != 1; - this.peerClusterId = this.queueRecovered ? + this.peerId = this.queueRecovered ? parts[0] : peerClusterZnode; this.peerClusterZnode = peerClusterZnode; this.deadRegionServers = new String[parts.length-1]; @@ -204,11 +214,11 @@ private void chooseSinks() throws KeeperException { this.currentPeers.clear(); List addresses = - this.zkHelper.getSlavesAddresses(peerClusterId); + this.zkHelper.getSlavesAddresses(peerId); Set setOfAddr = new HashSet(); int nbPeers = (int) (Math.ceil(addresses.size() * ratio)); LOG.info("Getting " + nbPeers + - " rs from peer cluster # " + peerClusterId); + " rs from peer cluster # " + peerId); for (int i = 0; i < nbPeers; i++) { HServerAddress address; // Make sure we get one address that we don't already have @@ -235,6 +245,15 @@ if (this.stopper.isStopped()) { return; } + // delay this until we in an asynchronous thread + try { + this.peerClusterId = UUID.fromString(ClusterId + .readClusterIdZNode(zkHelper.getPeerClusters().get(peerId).getZkw())); + } catch (KeeperException ke) { + throw new RuntimeException("Could not read peers cluster ids", ke); + } + LOG.info(clusterId + " -> " + peerClusterId); + // If this is recovered, the queue is already full and the first log // normally has a position (unless the RS failed between 2 logs) if (this.queueRecovered) { @@ -350,7 +369,7 @@ LOG.debug("Attempt to close connection failed", e); } } - LOG.debug("Source exiting " + peerClusterId); + LOG.debug("Source exiting " + peerId); } /** @@ -371,18 +390,27 @@ this.metrics.logEditsReadRate.inc(1); seenEntries++; // Remove all KVs that should not be replicated - removeNonReplicableEdits(edit); HLogKey logKey = entry.getKey(); - // Don't replicate catalog entries, if the WALEdit wasn't - // containing anything to replicate and if we're currently not set to replicate - if (!(Bytes.equals(logKey.getTablename(), HConstants.ROOT_TABLE_NAME) || - Bytes.equals(logKey.getTablename(), HConstants.META_TABLE_NAME)) && - edit.size() != 0 && replicating.get()) { - logKey.setClusterId(this.clusterId); - currentNbOperations += countDistinctRowKeys(edit); - currentNbEntries++; - } else { - this.metrics.logEditsFilteredRate.inc(1); + // don't replicate if the log entries originated in the peer + if (!logKey.getClusterId().equals(peerClusterId)) { + removeNonReplicableEdits(edit); + // Don't replicate catalog entries, if the WALEdit wasn't + // containing anything to replicate and if we're currently not set to replicate + if (!(Bytes.equals(logKey.getTablename(), HConstants.ROOT_TABLE_NAME) || + Bytes.equals(logKey.getTablename(), HConstants.META_TABLE_NAME)) && + edit.size() != 0 && replicating.get()) { + // Only set the clusterId if is a local key. + // This ensures that the originator sets the cluster id + // and all replicas retain the initial cluster id. + // This is *only* place where a cluster id other than the default are set. + if (HConstants.DEFAULT_CLUSTER_ID == logKey.getClusterId()) { + logKey.setClusterId(this.clusterId); + } + currentNbOperations += countDistinctRowKeys(edit); + currentNbEntries++; + } else { + this.metrics.logEditsFilteredRate.inc(1); + } } // Stop if too many entries or too big if ((this.reader.getPosition() - this.position) @@ -706,7 +734,7 @@ } public String getPeerClusterId() { - return this.peerClusterId; + return this.peerId; } public Path getCurrentPath() { Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (revision 1165482) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (working copy) @@ -22,6 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.Delete; @@ -106,6 +107,7 @@ if (kvs.get(0).isDelete()) { Delete delete = new Delete(kvs.get(0).getRow(), kvs.get(0).getTimestamp(), null); + delete.setClusterId(entry.getKey().getClusterId()); for (KeyValue kv : kvs) { if (kv.isDeleteFamily()) { delete.deleteFamily(kv.getFamily()); @@ -126,10 +128,12 @@ byte[] lastKey = kvs.get(0).getRow(); Put put = new Put(kvs.get(0).getRow(), kvs.get(0).getTimestamp()); + put.setClusterId(entry.getKey().getClusterId()); for (KeyValue kv : kvs) { if (!Bytes.equals(lastKey, kv.getRow())) { tableList.add(put); put = new Put(kv.getRow(), kv.getTimestamp()); + put.setClusterId(entry.getKey().getClusterId()); } put.add(kv.getFamily(), kv.getQualifier(), kv.getValue()); lastKey = kv.getRow(); Index: src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (revision 1165482) +++ src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (working copy) @@ -93,8 +93,6 @@ private final Configuration conf; // Is this cluster replicating at the moment? private AtomicBoolean replicating; - // Byte (stored as string here) that identifies this cluster - private String clusterId; // The key to our own cluster private String ourClusterKey; // Abortable @@ -146,12 +144,8 @@ conf.get("zookeeper.znode.replication", "replication"); String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers"); - String repMasterZNodeName = - conf.get("zookeeper.znode.replication.master", "master"); this.replicationStateNodeName = conf.get("zookeeper.znode.replication.state", "state"); - String clusterIdZNodeName = - conf.get("zookeeper.znode.replication.clusterId", "clusterId"); String rsZNodeName = conf.get("zookeeper.znode.replication.rs", "rs"); this.ourClusterKey = ZKUtil.getZooKeeperClusterKey(this.conf); @@ -162,11 +156,6 @@ this.rsZNode = ZKUtil.joinZNode(replicationZNode, rsZNodeName); ZKUtil.createWithParents(this.zookeeper, this.rsZNode); - String znode = ZKUtil.joinZNode(this.replicationZNode, clusterIdZNodeName); - byte [] data = ZKUtil.getData(this.zookeeper, znode); - String idResult = Bytes.toString(data); - this.clusterId = idResult == null? - Byte.toString(HConstants.DEFAULT_CLUSTER_ID): idResult; // Set a tracker on replicationStateNodeNode this.statusTracker = new ReplicationStatusTracker(this.zookeeper, abortable); @@ -702,15 +691,6 @@ } /** - * Get the identification of the cluster - * - * @return the id for the cluster - */ - public String getClusterId() { - return this.clusterId; - } - - /** * Get a map of all peer clusters * @return map of peer cluster keyed by id */