Index: hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestZooKeeperScanPolicyObserver.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestZooKeeperScanPolicyObserver.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestZooKeeperScanPolicyObserver.java (working copy) @@ -0,0 +1,107 @@ +package org.apache.hadoop.hbase.coprocessor.example; + +import static org.junit.Assert.assertEquals; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZooKeeper; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestZooKeeperScanPolicyObserver { + private static final Log LOG = LogFactory.getLog(TestZooKeeperScanPolicyObserver.class); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final byte[] F = Bytes.toBytes("fam"); + private static final byte[] Q = Bytes.toBytes("qual"); + private static final byte[] R = Bytes.toBytes("row"); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // Test we can first start the ZK cluster by itself + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + ZooKeeperScanPolicyObserver.class.getName()); + TEST_UTIL.startMiniZKCluster(); + TEST_UTIL.startMiniCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testScanPolicyObserver() throws Exception { + byte[] tableName = Bytes.toBytes("testScanPolicyObserver"); + HTableDescriptor desc = new HTableDescriptor(tableName); + HColumnDescriptor hcd = new HColumnDescriptor(F) + .setMaxVersions(10) + .setTimeToLive(1); + desc.addFamily(hcd); + TEST_UTIL.getHBaseAdmin().createTable(desc); + HTable t = new HTable(new Configuration(TEST_UTIL.getConfiguration()), tableName); + long now = EnvironmentEdgeManager.currentTimeMillis(); + + ZooKeeperWatcher zkw = HConnectionManager.getConnection(TEST_UTIL.getConfiguration()) + .getZooKeeperWatcher(); + ZooKeeper zk = zkw.getRecoverableZooKeeper().getZooKeeper(); + ZKUtil.createWithParents(zkw, ZooKeeperScanPolicyObserver.node); + // let's say test last backup was 1h ago + // using plain ZK here, because RecoverableZooKeeper add extra encoding to the data + zk.setData(ZooKeeperScanPolicyObserver.node, Bytes.toBytes(now - 3600*1000), -1); + + LOG.debug("Set time: "+Bytes.toLong(Bytes.toBytes(now - 3600*1000))); + + long ts = now - 2000; + Put p = new Put(R); + p.add(F, Q, ts, Q); + t.put(p); + p = new Put(R); + p.add(F, Q, ts+1, Q); + t.put(p); + + // these two should be expired but for the override + // (their ts was 2s in the past) + Get g = new Get(R); + g.setMaxVersions(10); + Result r = t.get(g); + // still there? + assertEquals(2, r.size()); + + TEST_UTIL.flush(tableName); + TEST_UTIL.compact(tableName, true); + + g = new Get(R); + g.setMaxVersions(10); + r = t.get(g); + // still there? + assertEquals(2, r.size()); + zk.setData(ZooKeeperScanPolicyObserver.node, Bytes.toBytes(now), -1); + LOG.debug("Set time: "+now); + + TEST_UTIL.compact(tableName, true); + + g = new Get(R); + g.setMaxVersions(10); + r = t.get(g); + // should be gone now + assertEquals(0, r.size()); + t.close(); + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java (working copy) @@ -0,0 +1,213 @@ +package org.apache.hadoop.hbase.coprocessor.example; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.NavigableSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.KeyValueScanner; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreScanner; +import org.apache.hadoop.hbase.regionserver.Store.ScanInfo; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; + +/** + * This is an example showing how a RegionObserver could configured + * via ZooKeeper in order to control a Region compaction, flush, and scan policy. + * + * This also demonstrated the use of shared {@link RegionObserver} state. + * See {@link RegionCoprocessorEnvironment#getSharedData()}. + * + * This would be useful for an incremental backup tool, which would indicate the last + * time of a successful backup via ZK and instruct HBase to not delete data that was + * inserted since (based on wall clock time). + * + * This implements org.apache.zookeeper.Watcher directly instead of using + * {@link ZooKeeperWatcher}, because RegionObservers come and go and currently + * listeners registered with ZooKeeperWatcher cannot be removed. + */ +public class ZooKeeperScanPolicyObserver extends BaseRegionObserver { + public static String node = "/backup/example/lastbackup"; + public static String zkkey = "ZK"; + private static final Log LOG = LogFactory.getLog(ZooKeeperScanPolicyObserver.class); + + /** + * Internal watcher that keep "data" up to date asynchronously. + */ + private static class ZKWatcher implements Watcher { + private byte[] data = null; + private ZooKeeper zk; + private volatile boolean needSetup = true; + private volatile long lastSetupTry = 0; + + public ZKWatcher(ZooKeeper zk) { + this.zk = zk; + // trigger the listening + getData(); + } + + /** + * Get the maintained data. In case of any ZK exceptions this will retry + * establishing the connection (but not more than twice/minute). + * + * getData is on the critical path, so make sure it is fast unless there is + * a problem (network partion, ZK ensemble down, etc) + * Make sure at most one (unlucky) thread retries and other threads don't pile up + * while that threads tries to recreate the connection. + * + * @return the last know version of the data + */ + public byte[] getData() { + // try at most twice/minute + if (needSetup && EnvironmentEdgeManager.currentTimeMillis() > lastSetupTry + 30000) { + synchronized (this) { + // make sure only one thread tries to reconnect + if (needSetup) { + needSetup = false; + } else { + return data; + } + } + // do this without the lock held to avoid threads piling up on this lock, + // as it can take a while + try { + LOG.debug("Connecting to ZK"); + // record this attempt + lastSetupTry = EnvironmentEdgeManager.currentTimeMillis(); + if (zk.exists(node, false) != null) { + data = zk.getData(node, this, null); + LOG.debug("Read synchronously: "+(data == null ? "null" : Bytes.toLong(data))); + } else { + zk.exists(node, this); + } + } catch (Exception x) { + // try again if this fails + needSetup = true; + } + } + return data; + } + + @Override + public void process(WatchedEvent event) { + switch(event.getType()) { + case NodeDataChanged: + case NodeCreated: + try { + // get data and re-watch + data = zk.getData(node, this, null); + LOG.debug("Read asynchronously: "+(data == null ? "null" : Bytes.toLong(data))); + } catch (InterruptedException ix) { + } catch (KeeperException kx) { + needSetup = true; + } + break; + + case NodeDeleted: + try { + // just re-watch + zk.exists(node, this); + data = null; + } catch (InterruptedException ix) { + } catch (KeeperException kx) { + needSetup = true; + } + break; + + default: + // ignore + } + } + } + + @Override + public void start(CoprocessorEnvironment e) throws IOException { + RegionCoprocessorEnvironment re = (RegionCoprocessorEnvironment) e; + if (!re.getSharedData().containsKey(zkkey)) { + // there is a short race here + // in the worst case we create a watcher that will be notified once + re.getSharedData().putIfAbsent( + zkkey, + new ZKWatcher(re.getRegionServerServices().getZooKeeper() + .getRecoverableZooKeeper().getZooKeeper())); + } + } + + @Override + public void stop(CoprocessorEnvironment e) throws IOException { + // nothing to do here + } + + protected ScanInfo getScanInfo(Store store, RegionCoprocessorEnvironment e) { + byte[] data = ((ZKWatcher)e.getSharedData().get(zkkey)).getData(); + if (data == null) { + return null; + } + ScanInfo oldSI = store.getScanInfo(); + if (oldSI.getTtl() == Long.MAX_VALUE) { + return null; + } + long ttl = Math.max(EnvironmentEdgeManager.currentTimeMillis() - Bytes.toLong(data), oldSI.getTtl()); + return new ScanInfo(store.getFamily(), ttl, + oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); + } + + @Override + public InternalScanner preFlushScannerOpen(final ObserverContext c, + Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException { + Store.ScanInfo scanInfo = getScanInfo(store, c.getEnvironment()); + if (scanInfo == null) { + // take default action + return null; + } + Scan scan = new Scan(); + scan.setMaxVersions(scanInfo.getMaxVersions()); + return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner), + ScanType.MINOR_COMPACT, store.getHRegion().getSmallestReadPoint(), + HConstants.OLDEST_TIMESTAMP); + } + + @Override + public InternalScanner preCompactScannerOpen(final ObserverContext c, + Store store, List scanners, ScanType scanType, long earliestPutTs, + InternalScanner s) throws IOException { + Store.ScanInfo scanInfo = getScanInfo(store, c.getEnvironment()); + if (scanInfo == null) { + // take default action + return null; + } + Scan scan = new Scan(); + scan.setMaxVersions(scanInfo.getMaxVersions()); + return new StoreScanner(store, scanInfo, scan, scanners, scanType, store.getHRegion() + .getSmallestReadPoint(), earliestPutTs); + } + + @Override + public KeyValueScanner preStoreScannerOpen(final ObserverContext c, + final Store store, final Scan scan, final NavigableSet targetCols, + final KeyValueScanner s) throws IOException { + Store.ScanInfo scanInfo = getScanInfo(store, c.getEnvironment()); + if (scanInfo == null) { + // take default action + return null; + } + return new StoreScanner(store, scanInfo, scan, targetCols); + } +}