Index: src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (revision 158995) +++ src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (working copy) @@ -199,4 +199,17 @@ this.connection.close(); } } + + public void setTimeStampFilter(String clusterId, String timeStamp){ + this.replicationZk.SetTimeStampFilter(clusterId, Long.parseLong(timeStamp)); + } + + public String getTimeStampFilter(String clusterId){ + long timeStamp = this.replicationZk.getTimeStampFilter(clusterId); + return Long.toString(timeStamp); + } + + public void dropTimeStampFilter(String clusterId){ + this.replicationZk.dropTimeStampFilter(clusterId); + } } Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (revision 158995) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (working copy) @@ -136,6 +136,8 @@ private volatile boolean running = true; // Metrics for this source private ReplicationSourceMetrics metrics; + + private long filterTimeStamp = HConstants.ZERO_L; /** * Instantiation method used by region servers @@ -193,6 +195,7 @@ // Finally look if this is a recovered queue this.checkIfQueueRecovered(peerClusterZnode); + } // The passed znode will be either the id of the peer cluster or @@ -275,6 +278,7 @@ } continue; } + this.filterTimeStamp = this.zkHelper.getTimeStampFilter(this.peerId); // Get a new path if (!getNextPath()) { if (sleepForRetries("No log to process", sleepMultiplier)) { @@ -523,7 +527,8 @@ // TODO What happens the log is missing in both places? } } - } catch (IOException ioe) { + } catch (IOException ioe) { + LOG.warn(peerClusterZnode + " Got: ", ioe); // TODO Need a better way to determinate if a file is really gone but // TODO without scanning all logs dir @@ -565,6 +570,11 @@ if (scopes == null || !scopes.containsKey(kv.getFamily())) { kvs.remove(i); } + if(kv.getTimestamp() < this.filterTimeStamp){ + LOG.debug("KV is filterd casue kv time stamp is lower than set." + + kv.getTimestamp()+ " vs "+ this.filterTimeStamp); + kvs.remove(i); + } } } Index: src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (revision 158995) +++ src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (working copy) @@ -837,6 +837,45 @@ return peersZNode; } + public void SetTimeStampFilter(String clusterId, long timeStamp){ + String clusterNode = ZKUtil.joinZNode(this.peersZNode, clusterId); + String timeStampNode = ZKUtil.joinZNode(clusterNode, "timefilter"); + try { + if(ZKUtil.checkExists(this.zookeeper, timeStampNode) == -1){ + ZKUtil.createWithParents(this.zookeeper,timeStampNode); + } + ZKUtil.setData(this.zookeeper, timeStampNode, + Bytes.toBytes(Long.toString(timeStamp))); + } catch (KeeperException e) { + LOG.error("Set time Stamp filter in "+ timeStampNode +" error ."+ e); + } + } + + public long getTimeStampFilter(String clusterId){ + String clusterNode = ZKUtil.joinZNode(this.peersZNode, clusterId); + String timeStampNode = ZKUtil.joinZNode(clusterNode, "timefilter"); + try { + String data = Bytes.toString(ZKUtil.getData(this.zookeeper, timeStampNode)); + return data == null || data.length() == 0 ? 0 : Long.parseLong(data); + } catch (KeeperException e) { + LOG.error("Get time Stamp filter in "+ timeStampNode +" error ."+ e); + return 0; + } + } + + public void dropTimeStampFilter(String clusterId){ + String clusterNode = ZKUtil.joinZNode(this.peersZNode, clusterId); + String timeStampNode = ZKUtil.joinZNode(clusterNode, "timefilter"); + try { + if(ZKUtil.checkExists(this.zookeeper, timeStampNode) != -1){ + ZKUtil.deleteNode(this.zookeeper,timeStampNode); + } + } catch (KeeperException e) { + LOG.error("Drop time stamp filter in "+ timeStampNode +" error ."+ e); + } + } + + /** * Tracker for status of the replication */ Index: src/main/ruby/hbase/replication_admin.rb =================================================================== --- src/main/ruby/hbase/replication_admin.rb (revision 158995) +++ src/main/ruby/hbase/replication_admin.rb (working copy) @@ -78,5 +78,24 @@ def stop_replication @replication_admin.setReplicating(false) end + + #---------------------------------------------------------------------------------------------- + # Set cluster timeStamp for replication + def set_timefilter(id,timestamp) + @replication_admin.setTimeStampFilter(id,timestamp) + end + + #---------------------------------------------------------------------------------------------- + # Get cluster timeStamp for replication + def get_timefilter(id) + @replication_admin.getTimeStampFilter(id) + end + + #---------------------------------------------------------------------------------------------- + # Delete cluster timeStamp for replication + def drop_timefilter(id) + @replication_admin.dropTimeStampFilter(id) + end + end end Index: src/main/ruby/shell.rb =================================================================== --- src/main/ruby/shell.rb (revision 158995) +++ src/main/ruby/shell.rb (working copy) @@ -286,6 +286,9 @@ disable_peer start_replication stop_replication + set_timefilter + get_timefilter + drop_timefilter ] ) Index: src/main/ruby/shell/commands/drop_timefilter.rb =================================================================== --- src/main/ruby/shell/commands/drop_timefilter.rb (revision 0) +++ src/main/ruby/shell/commands/drop_timefilter.rb (revision 0) @@ -0,0 +1,41 @@ +# +# Copyright 2010 The Apache Software Foundation +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class DropTimefilter< Command + def help + return <<-EOF +Drop the replication time filter node. + +Examples: + + hbase> drop_timefilter '1' +EOF + end + + def command(id) + format_simple_command do + replication_admin.drop_timefilter(id) + end + end + end + end +end \ No newline at end of file Index: src/main/ruby/shell/commands/get_timefilter.rb =================================================================== --- src/main/ruby/shell/commands/get_timefilter.rb (revision 0) +++ src/main/ruby/shell/commands/get_timefilter.rb (revision 0) @@ -0,0 +1,41 @@ +# +# Copyright The Apache Software Foundation +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class GetTimefilter< Command + def help + return <<-EOF +get replication time filter. + + hbase> get_timefilter '1' +EOF + end + + def command(id) + now = Time.now + ts = replication_admin.get_timefilter(id) + formatter.header(["PEER_ID", "TIME_FILTER"]) + formatter.row([ id, ts ]) + formatter.footer(now) + end + end + end +end Index: src/main/ruby/shell/commands/set_timefilter.rb =================================================================== --- src/main/ruby/shell/commands/set_timefilter.rb (revision 0) +++ src/main/ruby/shell/commands/set_timefilter.rb (revision 0) @@ -0,0 +1,43 @@ +# +# Copyright 2010 The Apache Software Foundation +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class SetTimefilter< Command + def help + return <<-EOF +Set a peer cluster time filter to replicate to, the row which time stamp is before +the timestamp will be filtered. + +Examples: + + hbase> set_timefilter '1', "1329896850047" + hbase> set_timefilter '2', "1329896850047" +EOF + end + + def command(id, timestamp) + format_simple_command do + replication_admin.set_timefilter(id, timestamp) + end + end + end + end +end