From 3cd07cd4fb29f4588579b55056d0c6f86838ffe3 Mon Sep 17 00:00:00 2001
From: Sean Busbey
Date: Tue, 29 Jul 2014 10:18:20 -0500
Subject: [PATCH] HBASE-12522 Backport of write-ahead-log refactoring and
follow-ons.
Issues incorporated and incompatible changes called out on the jira.
* Cleaned up references to HLog
* Deprecates HLogKey but maintains it for compatibility
- Moves all Writeable from WALKey to HLogKey
* Adds utility code to CoprocessorHost to help with evolving Coprocessor APIs
* RSRpcServices roll WAL call now requests the non-meta LogRoller roll all logs
- rolls actually happen asynchronously
- deprecated old api (and noted incompatible behavior change)
- modified api in new Admin interface to reflect lack of return values.
* Moved WAL user facing API to "WAL"
- only 1 sync offered
- WALTrailer removed from API
* make provider used by the WALFactory configurable.
* Move all WAL requests to use opaque ids instead of paths
* WALProvider provides API details for implementers and handles creation of WALs.
* Refactor WALActionsListener to have a basic implementation.
* turn MetricsWAL into a WALActionsListener.
* tests that needs FSHLog implementation details use them directly, others just reference provider + factory
- Some tests moved from Large to Medium based on run time.
* pull out wal disabling into its own no-op class
* update region open to delegate to WALFactory
* update performance test tool to allow for multiple regions
* Removed references to meta-specific wals within wal code
- replaced with generic suffixes
- WALFactory maintains a dedicated WALProvider for meta (and so knows about the distinction)
* maintain backwards compat on HLogPrettyPrinter and mark it deprecated.
- made WALPrettyPrinter IA.Private in favor of `bin/hbase wal`
* move WALUtil stuff that's implementation specific to said implementation
- WALUtil now acts as an integration point between the RegionServer and hte WAL code.
Incorporates contributions from v.himanshu and stack.
Conflicts:
hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
hbase-server/src/main/java/org/apache/hadoop/hbase/migration/NamespaceUpgrade.java
hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
hbase-server/src/main/java/org/apache/hadoop/hbase/util/HMerge.java
hbase-server/src/main/java/org/apache/hadoop/hbase/util/MetaUtils.java
hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java
hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestInvocationRecordFilter.java
hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorder.java
hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransaction.java
hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java
hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java
hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java
hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSVisitor.java
hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMergeTool.java
hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java
hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFiltering.java
hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java
hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java
hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitCompressed.java
src/main/docbkx/ops_mgt.xml
---
bin/hbase | 7 +-
bin/hbase.cmd | 11 +-
.../hadoop/hbase/DroppedSnapshotException.java | 2 +-
.../java/org/apache/hadoop/hbase/client/Admin.java | 15 +-
.../org/apache/hadoop/hbase/client/HBaseAdmin.java | 61 +-
.../hbase/client/replication/ReplicationAdmin.java | 2 +-
.../hadoop/hbase/protobuf/ResponseConverter.java | 16 -
.../hbase/replication/ReplicationQueues.java | 24 +-
.../hbase/replication/ReplicationQueuesClient.java | 6 +-
.../replication/ReplicationQueuesClientZKImpl.java | 2 +-
.../hbase/replication/ReplicationQueuesZKImpl.java | 70 +-
.../org/apache/hadoop/hbase/zookeeper/ZKUtil.java | 12 +-
.../java/org/apache/hadoop/hbase/HConstants.java | 4 +-
.../java/org/apache/hadoop/hbase/KeyValue.java | 2 +-
hbase-common/src/main/resources/hbase-default.xml | 10 +-
.../master/MetricsMasterFileSystemSource.java | 6 +-
.../regionserver/MetricsRegionServerSource.java | 8 +-
.../regionserver/MetricsRegionServerWrapper.java | 8 +-
.../regionserver/wal/MetricsEditsReplaySource.java | 2 +-
.../hbase/regionserver/wal/MetricsWALSource.java | 12 +-
.../regionserver/wal/TestMetricsHLogSource.java | 32 -
.../regionserver/wal/TestMetricsWALSource.java | 32 +
.../MetricsRegionServerSourceImpl.java | 4 +-
.../regionserver/wal/MetricsWALSourceImpl.java | 3 +-
.../hbase/IntegrationTestIngestWithEncryption.java | 7 +-
.../hadoop/hbase/mttr/IntegrationTestMTTR.java | 4 +-
.../hbase/protobuf/generated/AdminProtos.java | 12 +
.../generated/RegionServerStatusProtos.java | 16 +-
.../hadoop/hbase/protobuf/generated/WALProtos.java | 16 +-
.../hbase/protobuf/generated/ZooKeeperProtos.java | 4 +-
hbase-protocol/src/main/protobuf/Admin.proto | 4 +
.../src/main/protobuf/RegionServerStatus.proto | 2 +-
hbase-protocol/src/main/protobuf/WAL.proto | 9 +-
hbase-protocol/src/main/protobuf/ZooKeeper.proto | 2 +-
.../tmpl/regionserver/ServerMetricsTmpl.jamon | 16 +-
.../org/apache/hadoop/hbase/SplitLogCounters.java | 2 +-
.../coordination/SplitLogWorkerCoordination.java | 4 +-
.../ZKSplitLogManagerCoordination.java | 8 +-
.../coordination/ZkSplitLogWorkerCoordination.java | 12 +-
.../hbase/coprocessor/BaseRegionObserver.java | 19 +
.../hadoop/hbase/coprocessor/BaseWALObserver.java | 23 +-
.../hadoop/hbase/coprocessor/CoprocessorHost.java | 74 +
.../hadoop/hbase/coprocessor/RegionObserver.java | 60 +-
.../coprocessor/WALCoprocessorEnvironment.java | 6 +-
.../hadoop/hbase/coprocessor/WALObserver.java | 61 +-
.../org/apache/hadoop/hbase/fs/HFileSystem.java | 16 +-
.../java/org/apache/hadoop/hbase/io/HLogLink.java | 69 -
.../java/org/apache/hadoop/hbase/io/WALLink.java | 69 +
.../hadoop/hbase/mapreduce/HFileOutputFormat2.java | 4 +-
.../hadoop/hbase/mapreduce/HLogInputFormat.java | 235 +--
.../hbase/mapreduce/MultiTableOutputFormat.java | 4 +-
.../hbase/mapreduce/TableSnapshotInputFormat.java | 2 +-
.../hadoop/hbase/mapreduce/WALInputFormat.java | 287 +++
.../apache/hadoop/hbase/mapreduce/WALPlayer.java | 52 +-
.../hadoop/hbase/master/AssignmentManager.java | 14 +-
.../hadoop/hbase/master/MasterFileSystem.java | 45 +-
.../hbase/master/MetricsMasterFileSystem.java | 4 +-
.../apache/hadoop/hbase/master/RegionStates.java | 4 +-
.../apache/hadoop/hbase/master/ServerManager.java | 8 +-
.../hadoop/hbase/master/SplitLogManager.java | 40 +-
.../hadoop/hbase/master/cleaner/LogCleaner.java | 6 +-
.../hbase/master/cleaner/TimeToLiveLogCleaner.java | 2 +-
.../master/handler/MetaServerShutdownHandler.java | 4 +-
.../master/handler/ServerShutdownHandler.java | 20 +-
.../hbase/master/snapshot/SnapshotLogCleaner.java | 8 +-
.../hadoop/hbase/migration/NamespaceUpgrade.java | 18 +-
.../apache/hadoop/hbase/migration/UpgradeTo96.java | 7 +-
.../hbase/protobuf/ReplicationProtbufUtil.java | 20 +-
.../apache/hadoop/hbase/regionserver/HRegion.java | 415 ++--
.../hadoop/hbase/regionserver/HRegionServer.java | 232 +--
.../apache/hadoop/hbase/regionserver/HStore.java | 6 +-
.../hadoop/hbase/regionserver/LogRoller.java | 115 +-
.../hadoop/hbase/regionserver/MemStoreFlusher.java | 4 +-
.../hadoop/hbase/regionserver/MetaLogRoller.java | 38 -
.../MetricsRegionServerWrapperImpl.java | 29 +-
.../hadoop/hbase/regionserver/RSRpcServices.java | 37 +-
.../hbase/regionserver/RegionCoprocessorHost.java | 60 +-
.../hbase/regionserver/RegionServerAccounting.java | 2 +-
.../hbase/regionserver/RegionServerServices.java | 6 +-
.../hadoop/hbase/regionserver/SplitLogWorker.java | 12 +-
.../regionserver/handler/HLogSplitterHandler.java | 106 -
.../regionserver/handler/WALSplitterHandler.java | 106 +
.../hbase/regionserver/wal/CompressionContext.java | 11 +-
.../hadoop/hbase/regionserver/wal/Compressor.java | 18 +-
.../hadoop/hbase/regionserver/wal/FSHLog.java | 709 +++----
.../hadoop/hbase/regionserver/wal/FSWALEntry.java | 11 +-
.../apache/hadoop/hbase/regionserver/wal/HLog.java | 445 ----
.../hadoop/hbase/regionserver/wal/HLogFactory.java | 207 --
.../hadoop/hbase/regionserver/wal/HLogKey.java | 430 +---
.../hbase/regionserver/wal/HLogPrettyPrinter.java | 315 +--
.../hbase/regionserver/wal/HLogSplitter.java | 1988 ------------------
.../hadoop/hbase/regionserver/wal/HLogUtil.java | 365 ----
.../hadoop/hbase/regionserver/wal/MetricsWAL.java | 13 +-
.../hbase/regionserver/wal/ProtobufLogReader.java | 40 +-
.../hbase/regionserver/wal/ProtobufLogWriter.java | 14 +-
.../hadoop/hbase/regionserver/wal/ReaderBase.java | 29 +-
.../regionserver/wal/SecureProtobufLogWriter.java | 6 +-
.../regionserver/wal/SequenceFileLogReader.java | 21 +-
.../hbase/regionserver/wal/WALActionsListener.java | 76 +-
.../hbase/regionserver/wal/WALCellCodec.java | 2 +-
.../hbase/regionserver/wal/WALCoprocessorHost.java | 67 +-
.../hadoop/hbase/regionserver/wal/WALEdit.java | 8 +-
.../hbase/regionserver/wal/WALEditsReplaySink.java | 31 +-
.../hadoop/hbase/regionserver/wal/WALUtil.java | 101 +
.../hadoop/hbase/regionserver/wal/WriterBase.java | 4 +-
.../hbase/replication/ChainWALEntryFilter.java | 2 +-
.../hbase/replication/ReplicationEndpoint.java | 8 +-
.../hbase/replication/ScopeWALEntryFilter.java | 2 +-
.../replication/SystemTableWALEntryFilter.java | 2 +-
.../hbase/replication/TableCfWALEntryFilter.java | 2 +-
.../hadoop/hbase/replication/WALEntryFilter.java | 10 +-
.../replication/master/ReplicationLogCleaner.java | 26 +-
.../HBaseInterClusterReplicationEndpoint.java | 6 +-
.../replication/regionserver/Replication.java | 36 +-
.../regionserver/ReplicationHLogReaderManager.java | 144 --
.../replication/regionserver/ReplicationSink.java | 2 +-
.../regionserver/ReplicationSource.java | 38 +-
.../regionserver/ReplicationSourceManager.java | 104 +-
.../regionserver/ReplicationWALReaderManager.java | 145 ++
.../hbase/security/access/AccessController.java | 2 +-
.../access/HbaseObjectWritableFor96Migration.java | 38 +-
.../visibility/VisibilityReplicationEndpoint.java | 2 +-
.../hadoop/hbase/snapshot/ExportSnapshot.java | 14 +-
.../apache/hadoop/hbase/snapshot/SnapshotInfo.java | 6 +-
.../hbase/snapshot/SnapshotReferenceUtil.java | 8 +-
.../org/apache/hadoop/hbase/util/FSHDFSUtils.java | 2 +-
.../org/apache/hadoop/hbase/util/FSVisitor.java | 14 +-
.../org/apache/hadoop/hbase/util/HBaseFsck.java | 6 +-
.../java/org/apache/hadoop/hbase/util/HMerge.java | 18 +-
.../java/org/apache/hadoop/hbase/util/Merge.java | 7 +-
.../org/apache/hadoop/hbase/util/MetaUtils.java | 35 +-
.../apache/hadoop/hbase/util/RegionSplitter.java | 2 +-
.../hadoop/hbase/wal/DefaultWALProvider.java | 369 ++++
.../hadoop/hbase/wal/DisabledWALProvider.java | 213 ++
.../main/java/org/apache/hadoop/hbase/wal/WAL.java | 263 +++
.../org/apache/hadoop/hbase/wal/WALFactory.java | 430 ++++
.../java/org/apache/hadoop/hbase/wal/WALKey.java | 553 +++++
.../apache/hadoop/hbase/wal/WALPrettyPrinter.java | 407 ++++
.../org/apache/hadoop/hbase/wal/WALProvider.java | 83 +
.../org/apache/hadoop/hbase/wal/WALSplitter.java | 2171 ++++++++++++++++++++
.../apache/hadoop/hbase/zookeeper/ZKSplitLog.java | 2 +-
.../org/apache/hadoop/hbase/HBaseTestCase.java | 2 +-
.../apache/hadoop/hbase/HBaseTestingUtility.java | 22 +-
.../hadoop/hbase/MockRegionServerServices.java | 4 +-
.../org/apache/hadoop/hbase/TestIOFencing.java | 13 +-
.../hadoop/hbase/backup/TestHFileArchiving.java | 2 +-
.../org/apache/hadoop/hbase/client/TestAdmin2.java | 10 +-
.../hbase/coprocessor/SampleRegionWALObserver.java | 75 +-
.../hbase/coprocessor/SimpleRegionObserver.java | 49 +-
.../coprocessor/TestRegionObserverInterface.java | 69 +-
.../TestRegionObserverScannerOpenHook.java | 4 +-
.../hadoop/hbase/coprocessor/TestWALObserver.java | 199 +-
.../org/apache/hadoop/hbase/filter/TestFilter.java | 14 +-
.../hbase/filter/TestInvocationRecordFilter.java | 8 +-
.../apache/hadoop/hbase/fs/TestBlockReorder.java | 114 +-
.../org/apache/hadoop/hbase/io/TestHeapSize.java | 2 +-
.../hbase/mapreduce/TestHLogRecordReader.java | 230 +--
.../hadoop/hbase/mapreduce/TestImportExport.java | 55 +-
.../hbase/mapreduce/TestTableMapReduceUtil.java | 8 +-
.../hadoop/hbase/mapreduce/TestWALPlayer.java | 31 +-
.../hbase/mapreduce/TestWALRecordReader.java | 268 +++
.../hadoop/hbase/master/MockRegionServer.java | 4 +-
.../hbase/master/TestDistributedLogSplitting.java | 132 +-
.../hadoop/hbase/master/TestMasterFailover.java | 2 +-
.../master/snapshot/TestSnapshotFileCache.java | 17 +-
.../MetricsRegionServerWrapperStub.java | 4 +-
.../hbase/regionserver/TestAtomicOperation.java | 4 +-
.../regionserver/TestCacheOnWriteInSchema.java | 20 +-
.../hadoop/hbase/regionserver/TestCompaction.java | 6 +-
.../regionserver/TestDefaultCompactSelection.java | 19 +-
.../regionserver/TestGetClosestAtOrBefore.java | 6 +-
.../hadoop/hbase/regionserver/TestHRegion.java | 227 +-
.../hbase/regionserver/TestMajorCompaction.java | 6 +-
.../hbase/regionserver/TestMinorCompaction.java | 6 +-
.../regionserver/TestRegionMergeTransaction.java | 29 +-
.../hbase/regionserver/TestSplitTransaction.java | 22 +-
.../hadoop/hbase/regionserver/TestStore.java | 16 +-
.../regionserver/TestStoreFileRefresherChore.java | 12 +-
.../hadoop/hbase/regionserver/wal/FaultyHLog.java | 70 -
.../wal/FaultySequenceFileLogReader.java | 12 +-
.../wal/HLogPerformanceEvaluation.java | 566 -----
.../hbase/regionserver/wal/HLogUtilsForTests.java | 45 -
.../regionserver/wal/InstrumentedLogWriter.java | 43 +
.../wal/InstrumentedSequenceFileLogWriter.java | 40 -
.../regionserver/wal/SequenceFileLogWriter.java | 19 +-
.../hbase/regionserver/wal/TestDurability.java | 55 +-
.../hadoop/hbase/regionserver/wal/TestFSHLog.java | 478 +++++
.../hadoop/hbase/regionserver/wal/TestHLog.java | 1341 ------------
.../hbase/regionserver/wal/TestHLogFiltering.java | 153 --
.../hbase/regionserver/wal/TestHLogMethods.java | 179 --
.../wal/TestHLogReaderOnSecureHLog.java | 197 --
.../hbase/regionserver/wal/TestHLogSplit.java | 1567 --------------
.../regionserver/wal/TestHLogSplitCompressed.java | 35 -
.../hbase/regionserver/wal/TestLogRollAbort.java | 92 +-
.../hbase/regionserver/wal/TestLogRollPeriod.java | 23 +-
.../hbase/regionserver/wal/TestLogRolling.java | 151 +-
.../regionserver/wal/TestLogRollingNoCluster.java | 34 +-
.../hbase/regionserver/wal/TestProtobufLog.java | 208 ++
.../wal/TestReadOldRootAndMetaEdits.java | 39 +-
.../hbase/regionserver/wal/TestSecureHLog.java | 129 --
.../regionserver/wal/TestSecureWALReplay.java | 6 +-
.../regionserver/wal/TestWALActionsListener.java | 63 +-
.../hbase/regionserver/wal/TestWALReplay.java | 133 +-
.../replication/TestMultiSlaveReplication.java | 63 +-
.../TestReplicationChangingPeerRegionservers.java | 2 +-
.../hbase/replication/TestReplicationEndpoint.java | 7 +-
.../TestReplicationKillMasterRSCompressed.java | 2 +-
.../replication/TestReplicationSmallTests.java | 8 +-
.../hbase/replication/TestReplicationSource.java | 24 +-
.../TestReplicationWALEntryFilters.java | 41 +-
.../TestReplicationHLogReaderManager.java | 240 ---
.../regionserver/TestReplicationSourceManager.java | 43 +-
.../TestReplicationWALReaderManager.java | 222 ++
.../TestVisibilityLabelsReplication.java | 2 +-
.../apache/hadoop/hbase/util/TestFSVisitor.java | 19 +-
.../apache/hadoop/hbase/util/TestHBaseFsck.java | 16 +-
.../apache/hadoop/hbase/util/TestMergeTool.java | 62 +-
.../org/apache/hadoop/hbase/wal/FaultyFSLog.java | 76 +
.../hadoop/hbase/wal/TestDefaultWALProvider.java | 332 +++
.../wal/TestDefaultWALProviderWithHLogKey.java | 35 +
.../org/apache/hadoop/hbase/wal/TestSecureWAL.java | 137 ++
.../apache/hadoop/hbase/wal/TestWALFactory.java | 725 +++++++
.../apache/hadoop/hbase/wal/TestWALFiltering.java | 153 ++
.../apache/hadoop/hbase/wal/TestWALMethods.java | 186 ++
.../hadoop/hbase/wal/TestWALReaderOnSecureWAL.java | 217 ++
.../org/apache/hadoop/hbase/wal/TestWALSplit.java | 1308 ++++++++++++
.../hadoop/hbase/wal/TestWALSplitCompressed.java | 35 +
.../hadoop/hbase/wal/WALPerformanceEvaluation.java | 567 +++++
hbase-shell/src/main/ruby/hbase/admin.rb | 8 +-
hbase-shell/src/main/ruby/shell.rb | 8 +-
.../src/main/ruby/shell/commands/hlog_roll.rb | 39 -
.../src/main/ruby/shell/commands/wal_roll.rb | 42 +
src/main/docbkx/book.xml | 29 +-
src/main/docbkx/ops_mgt.xml | 17 +-
src/main/docbkx/performance.xml | 4 +-
src/main/docbkx/troubleshooting.xml | 10 +-
236 files changed, 13525 insertions(+), 11444 deletions(-)
delete mode 100644 hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsHLogSource.java
create mode 100644 hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWALSource.java
delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/io/HLogLink.java
create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/io/WALLink.java
create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetaLogRoller.java
delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java
create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/WALSplitterHandler.java
delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java
delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java
create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationWALReaderManager.java
create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java
create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultyHLog.java
delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java
delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtilsForTests.java
create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedLogWriter.java
delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java
create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogFiltering.java
delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java
delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogReaderOnSecureHLog.java
delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplitCompressed.java
create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java
delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureHLog.java
delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationHLogReaderManager.java
create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java
create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java
create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java
create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProviderWithHLogKey.java
create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java
create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFiltering.java
create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java
create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java
create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitCompressed.java
create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java
delete mode 100644 hbase-shell/src/main/ruby/shell/commands/hlog_roll.rb
create mode 100644 hbase-shell/src/main/ruby/shell/commands/wal_roll.rb
diff --git a/bin/hbase b/bin/hbase
index 7e7bf0d..31fd232 100755
--- a/bin/hbase
+++ b/bin/hbase
@@ -83,7 +83,7 @@ if [ $# = 0 ]; then
echo "Some commands take arguments. Pass no args or -h for usage."
echo " shell Run the HBase shell"
echo " hbck Run the hbase 'fsck' tool"
- echo " hlog Write-ahead-log analyzer"
+ echo " wal Write-ahead-log analyzer"
echo " hfile Store file analyzer"
echo " zkcli Run the ZooKeeper shell"
echo " upgrade Upgrade hbase"
@@ -293,8 +293,9 @@ if [ "$COMMAND" = "shell" ] ; then
CLASS="org.jruby.Main -X+O ${JRUBY_OPTS} ${HBASE_HOME}/bin/hirb.rb"
elif [ "$COMMAND" = "hbck" ] ; then
CLASS='org.apache.hadoop.hbase.util.HBaseFsck'
-elif [ "$COMMAND" = "hlog" ] ; then
- CLASS='org.apache.hadoop.hbase.regionserver.wal.HLogPrettyPrinter'
+# TODO remove old 'hlog' version
+elif [ "$COMMAND" = "hlog" -o "$COMMAND" = "wal" ] ; then
+ CLASS='org.apache.hadoop.hbase.wal.WALPrettyPrinter'
elif [ "$COMMAND" = "hfile" ] ; then
CLASS='org.apache.hadoop.hbase.io.hfile.HFilePrettyPrinter'
elif [ "$COMMAND" = "zkcli" ] ; then
diff --git a/bin/hbase.cmd b/bin/hbase.cmd
index e30e1cd..db7d856 100644
--- a/bin/hbase.cmd
+++ b/bin/hbase.cmd
@@ -210,7 +210,7 @@ goto :MakeCmdArgsLoop
set hbase-command-arguments=%_hbasearguments%
@rem figure out which class to run
-set corecommands=shell master regionserver thrift thrift2 rest avro hlog hbck hfile zookeeper zkcli upgrade mapredcp
+set corecommands=shell master regionserver thrift thrift2 rest avro hlog wal hbck hfile zookeeper zkcli upgrade mapredcp
for %%i in ( %corecommands% ) do (
if "%hbase-command%"=="%%i" set corecommand=true
)
@@ -375,8 +375,13 @@ goto :eof
set CLASS=org.apache.hadoop.hbase.util.HBaseFsck
goto :eof
+@rem TODO remove older 'hlog' command
:hlog
- set CLASS=org.apache.hadoop.hbase.regionserver.wal.HLogPrettyPrinter
+ set CLASS=org.apache.hadoop.hbase.wal.WALPrettyPrinter
+ goto :eof
+
+:wal
+ set CLASS=org.apache.hadoop.hbase.wal.WALPrettyPrinter
goto :eof
:hfile
@@ -416,7 +421,7 @@ goto :eof
echo Some commands take arguments. Pass no args or -h for usage."
echo shell Run the HBase shell
echo hbck Run the hbase 'fsck' tool
- echo hlog Write-ahead-log analyzer
+ echo wal Write-ahead-log analyzer
echo hfile Store file analyzer
echo zkcli Run the ZooKeeper shell
echo upgrade Upgrade hbase
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/DroppedSnapshotException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/DroppedSnapshotException.java
index c4799b2..830339c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/DroppedSnapshotException.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/DroppedSnapshotException.java
@@ -22,7 +22,7 @@ import java.io.IOException;
/**
* Thrown during flush if the possibility snapshot content was not properly
- * persisted into store files. Response should include replay of hlog content.
+ * persisted into store files. Response should include replay of wal content.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
index 39adc21..509ee92 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
@@ -852,18 +852,17 @@ public interface Admin extends Abortable, Closeable {
throws IOException;
/**
- * Roll the log writer. That is, start writing log messages to a new file.
+ * Roll the log writer. I.e. for filesystem based write ahead logs, start writing to a new file.
*
- * @param serverName The servername of the regionserver. A server name is made of host, port and
- * startcode. This is mandatory. Here is an example:
- * host187.example.com,60020,1289493121758
- * @return If lots of logs, flush the returned regions so next time through we can clean logs.
- * Returns null if nothing to flush. Names are actual region names as returned by {@link
- * HRegionInfo#getEncodedName()}
+ * Note that the actual rolling of the log writer is asynchronous and may not be complete when
+ * this method returns. As a side effect of this call, the named region server may schedule
+ * store flushes at the request of the wal.
+ *
+ * @param serverName The servername of the regionserver.
* @throws IOException if a remote or network exception occurs
* @throws org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException
*/
- byte[][] rollHLogWriter(String serverName) throws IOException, FailedLogCloseException;
+ void rollWALWriter(ServerName serverName) throws IOException, FailedLogCloseException;
/**
* Helper delegage to getClusterStatus().getMasterCoprocessors().
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
index 508d171..a9b98c0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
@@ -2626,37 +2626,60 @@ public class HBaseAdmin implements Admin {
return getTableDescriptorsByTableName(tableNames);
}
+ private RollWALWriterResponse rollWALWriterImpl(final ServerName sn) throws IOException,
+ FailedLogCloseException {
+ AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
+ RollWALWriterRequest request = RequestConverter.buildRollWALWriterRequest();
+ try {
+ return admin.rollWALWriter(null, request);
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
+ }
+
/**
- * Roll the log writer. That is, start writing log messages to a new file.
+ * Roll the log writer. I.e. when using a file system based write ahead log,
+ * start writing log messages to a new file.
+ *
+ * Note that when talking to a version 1.0+ HBase deployment, the rolling is asynchronous.
+ * This method will return as soon as the roll is requested and the return value will
+ * always be null. Additionally, the named region server may schedule store flushes at the
+ * request of the wal handling the roll request.
+ *
+ * When talking to a 0.98 or older HBase deployment, the rolling is synchronous and the
+ * return value may be either null or a list of encoded region names.
*
* @param serverName
* The servername of the regionserver. A server name is made of host,
* port and startcode. This is mandatory. Here is an example:
* host187.example.com,60020,1289493121758
- * @return If lots of logs, flush the returned regions so next time through
- * we can clean logs. Returns null if nothing to flush. Names are actual
- * region names as returned by {@link HRegionInfo#getEncodedName()}
+ * @return a set of {@link HRegionInfo#getEncodedName()} that would allow the wal to
+ * clean up some underlying files. null if there's nothing to flush.
* @throws IOException if a remote or network exception occurs
* @throws FailedLogCloseException
+ * @deprecated use {@link #rollWALWriter(ServerName)}
*/
- @Override
- public synchronized byte[][] rollHLogWriter(String serverName)
+ @Deprecated
+ public synchronized byte[][] rollHLogWriter(String serverName)
throws IOException, FailedLogCloseException {
ServerName sn = ServerName.valueOf(serverName);
- AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
- RollWALWriterRequest request = RequestConverter.buildRollWALWriterRequest();
- try {
- RollWALWriterResponse response = admin.rollWALWriter(null, request);
- int regionCount = response.getRegionToFlushCount();
- byte[][] regionsToFlush = new byte[regionCount][];
- for (int i = 0; i < regionCount; i++) {
- ByteString region = response.getRegionToFlush(i);
- regionsToFlush[i] = region.toByteArray();
- }
- return regionsToFlush;
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
+ final RollWALWriterResponse response = rollWALWriterImpl(sn);
+ int regionCount = response.getRegionToFlushCount();
+ if (0 == regionCount) {
+ return null;
}
+ byte[][] regionsToFlush = new byte[regionCount][];
+ for (int i = 0; i < regionCount; i++) {
+ ByteString region = response.getRegionToFlush(i);
+ regionsToFlush[i] = region.toByteArray();
+ }
+ return regionsToFlush;
+ }
+
+ @Override
+ public synchronized void rollWALWriter(ServerName serverName)
+ throws IOException, FailedLogCloseException {
+ rollWALWriterImpl(serverName);
}
@Override
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
index 16c28f0..74b413b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
@@ -67,7 +67,7 @@ import com.google.common.collect.Lists;
* Adding a new peer results in creating new outbound connections from every
* region server to a subset of region servers on the slave cluster. Each
* new stream of replication will start replicating from the beginning of the
- * current HLog, meaning that edits from that past will be replicated.
+ * current WAL, meaning that edits from that past will be replicated.
*
*
* Removing a peer is a destructive and irreversible operation that stops
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
index 8b7e6f2..725736a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRespons
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
@@ -184,21 +183,6 @@ public final class ResponseConverter {
// Start utilities for Admin
/**
- * Get the list of regions to flush from a RollLogWriterResponse
- *
- * @param proto the RollLogWriterResponse
- * @return the the list of regions to flush
- */
- public static byte[][] getRegions(final RollWALWriterResponse proto) {
- if (proto == null || proto.getRegionToFlushCount() == 0) return null;
- List regions = new ArrayList();
- for (ByteString region: proto.getRegionToFlushList()) {
- regions.add(region.toByteArray());
- }
- return (byte[][])regions.toArray();
- }
-
- /**
* Get the list of region info from a GetOnlineRegionResponse
*
* @param proto the GetOnlineRegionResponse
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
index 611f663..3dbbc33 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* This provides an interface for maintaining a region server's replication queues. These queues
- * keep track of the HLogs that still need to be replicated to remote clusters.
+ * keep track of the WALs that still need to be replicated to remote clusters.
*/
@InterfaceAudience.Private
public interface ReplicationQueues {
@@ -45,31 +45,31 @@ public interface ReplicationQueues {
void removeQueue(String queueId);
/**
- * Add a new HLog file to the given queue. If the queue does not exist it is created.
+ * Add a new WAL file to the given queue. If the queue does not exist it is created.
* @param queueId a String that identifies the queue.
- * @param filename name of the HLog
+ * @param filename name of the WAL
*/
void addLog(String queueId, String filename) throws ReplicationException;
/**
- * Remove an HLog file from the given queue.
+ * Remove an WAL file from the given queue.
* @param queueId a String that identifies the queue.
- * @param filename name of the HLog
+ * @param filename name of the WAL
*/
void removeLog(String queueId, String filename);
/**
- * Set the current position for a specific HLog in a given queue.
+ * Set the current position for a specific WAL in a given queue.
* @param queueId a String that identifies the queue
- * @param filename name of the HLog
+ * @param filename name of the WAL
* @param position the current position in the file
*/
void setLogPosition(String queueId, String filename, long position);
/**
- * Get the current position for a specific HLog in a given queue.
+ * Get the current position for a specific WAL in a given queue.
* @param queueId a String that identifies the queue
- * @param filename name of the HLog
+ * @param filename name of the WAL
* @return the current position in the file
*/
long getLogPosition(String queueId, String filename) throws ReplicationException;
@@ -80,9 +80,9 @@ public interface ReplicationQueues {
void removeAllQueues();
/**
- * Get a list of all HLogs in the given queue.
+ * Get a list of all WALs in the given queue.
* @param queueId a String that identifies the queue
- * @return a list of HLogs, null if this region server is dead and has no outstanding queues
+ * @return a list of WALs, null if this region server is dead and has no outstanding queues
*/
List getLogsInQueue(String queueId);
@@ -95,7 +95,7 @@ public interface ReplicationQueues {
/**
* Take ownership for the set of queues belonging to a dead region server.
* @param regionserver the id of the dead region server
- * @return A SortedMap of the queues that have been claimed, including a SortedSet of HLogs in
+ * @return A SortedMap of the queues that have been claimed, including a SortedSet of WALs in
* each queue. Returns an empty map if no queues were failed-over.
*/
SortedMap> claimQueues(String regionserver);
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
index 5c068be..fed1791 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* This provides an interface for clients of replication to view replication queues. These queues
- * keep track of the HLogs that still need to be replicated to remote clusters.
+ * keep track of the WALs that still need to be replicated to remote clusters.
*/
@InterfaceAudience.Private
public interface ReplicationQueuesClient {
@@ -43,10 +43,10 @@ public interface ReplicationQueuesClient {
List getListOfReplicators();
/**
- * Get a list of all HLogs in the given queue on the given region server.
+ * Get a list of all WALs in the given queue on the given region server.
* @param serverName the server name of the region server that owns the queue
* @param queueId a String that identifies the queue
- * @return a list of HLogs, null if this region server is dead and has no outstanding queues
+ * @return a list of WALs, null if this region server is dead and has no outstanding queues
*/
List getLogsInQueue(String serverName, String queueId);
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
index 3bc4f48..43262a0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
@@ -53,7 +53,7 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem
try {
result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
} catch (KeeperException e) {
- this.abortable.abort("Failed to get list of hlogs for queueId=" + queueId
+ this.abortable.abort("Failed to get list of wals for queueId=" + queueId
+ " and serverName=" + serverName, e);
}
return result;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
index 9abb94b..6a30511 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
@@ -43,21 +43,21 @@ import org.apache.zookeeper.KeeperException;
/**
* This class provides an implementation of the ReplicationQueues interface using Zookeeper. The
* base znode that this class works at is the myQueuesZnode. The myQueuesZnode contains a list of
- * all outstanding HLog files on this region server that need to be replicated. The myQueuesZnode is
+ * all outstanding WAL files on this region server that need to be replicated. The myQueuesZnode is
* the regionserver name (a concatenation of the region server’s hostname, client port and start
* code). For example:
*
* /hbase/replication/rs/hostname.example.org,6020,1234
*
- * Within this znode, the region server maintains a set of HLog replication queues. These queues are
+ * Within this znode, the region server maintains a set of WAL replication queues. These queues are
* represented by child znodes named using there give queue id. For example:
*
* /hbase/replication/rs/hostname.example.org,6020,1234/1
* /hbase/replication/rs/hostname.example.org,6020,1234/2
*
- * Each queue has one child znode for every HLog that still needs to be replicated. The value of
- * these HLog child znodes is the latest position that has been replicated. This position is updated
- * every time a HLog entry is replicated. For example:
+ * Each queue has one child znode for every WAL that still needs to be replicated. The value of
+ * these WAL child znodes is the latest position that has been replicated. This position is updated
+ * every time a WAL entry is replicated. For example:
*
* /hbase/replication/rs/hostname.example.org,6020,1234/1/23522342.23422 [VALUE: 254]
*/
@@ -115,7 +115,7 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
znode = ZKUtil.joinZNode(znode, filename);
ZKUtil.deleteNode(this.zookeeper, znode);
} catch (KeeperException e) {
- this.abortable.abort("Failed to remove hlog from queue (queueId=" + queueId + ", filename="
+ this.abortable.abort("Failed to remove wal from queue (queueId=" + queueId + ", filename="
+ filename + ")", e);
}
}
@@ -128,7 +128,7 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
// Why serialize String of Long and not Long as bytes?
ZKUtil.setData(this.zookeeper, znode, ZKUtil.positionToByteArray(position));
} catch (KeeperException e) {
- this.abortable.abort("Failed to write replication hlog position (filename=" + filename
+ this.abortable.abort("Failed to write replication wal position (filename=" + filename
+ ", position=" + position + ")", e);
}
}
@@ -148,12 +148,12 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
return 0;
}
try {
- return ZKUtil.parseHLogPositionFrom(bytes);
+ return ZKUtil.parseWALPositionFrom(bytes);
} catch (DeserializationException de) {
- LOG.warn("Failed to parse HLogPosition for queueId=" + queueId + " and hlog=" + filename
+ LOG.warn("Failed to parse WALPosition for queueId=" + queueId + " and wal=" + filename
+ "znode content, continuing.");
}
- // if we can not parse the position, start at the beginning of the hlog file
+ // if we can not parse the position, start at the beginning of the wal file
// again
return 0;
}
@@ -168,10 +168,10 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
SortedMap> newQueues = new TreeMap>();
// check whether there is multi support. If yes, use it.
if (conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) {
- LOG.info("Atomically moving " + regionserverZnode + "'s hlogs to my queue");
+ LOG.info("Atomically moving " + regionserverZnode + "'s wals to my queue");
newQueues = copyQueuesFromRSUsingMulti(regionserverZnode);
} else {
- LOG.info("Moving " + regionserverZnode + "'s hlogs to my queue");
+ LOG.info("Moving " + regionserverZnode + "'s wals to my queue");
if (!lockOtherRS(regionserverZnode)) {
return newQueues;
}
@@ -202,7 +202,7 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
try {
result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
} catch (KeeperException e) {
- this.abortable.abort("Failed to get list of hlogs for queueId=" + queueId, e);
+ this.abortable.abort("Failed to get list of wals for queueId=" + queueId, e);
}
return result;
}
@@ -285,10 +285,10 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
}
/**
- * It "atomically" copies all the hlogs queues from another region server and returns them all
+ * It "atomically" copies all the wals queues from another region server and returns them all
* sorted per peer cluster (appended with the dead server's znode).
* @param znode pertaining to the region server to copy the queues from
- * @return HLog queues sorted per peer cluster
+ * @return WAL queues sorted per peer cluster
*/
private SortedMap> copyQueuesFromRSUsingMulti(String znode) {
SortedMap> queues = new TreeMap>();
@@ -310,8 +310,8 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
String newPeerZnode = ZKUtil.joinZNode(this.myQueuesZnode, newPeerId);
// check the logs queue for the old peer cluster
String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId);
- List hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode);
- if (hlogs == null || hlogs.size() == 0) {
+ List wals = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode);
+ if (wals == null || wals.size() == 0) {
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
continue; // empty log queue.
}
@@ -321,15 +321,15 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY);
listOfOps.add(op);
// get the offset of the logs and set it to new znodes
- for (String hlog : hlogs) {
- String oldHlogZnode = ZKUtil.joinZNode(oldClusterZnode, hlog);
- byte[] logOffset = ZKUtil.getData(this.zookeeper, oldHlogZnode);
- LOG.debug("Creating " + hlog + " with data " + Bytes.toString(logOffset));
- String newLogZnode = ZKUtil.joinZNode(newPeerZnode, hlog);
+ for (String wal : wals) {
+ String oldWalZnode = ZKUtil.joinZNode(oldClusterZnode, wal);
+ byte[] logOffset = ZKUtil.getData(this.zookeeper, oldWalZnode);
+ LOG.debug("Creating " + wal + " with data " + Bytes.toString(logOffset));
+ String newLogZnode = ZKUtil.joinZNode(newPeerZnode, wal);
listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset));
// add ops for deleting
- listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldHlogZnode));
- logQueue.add(hlog);
+ listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode));
+ logQueue.add(wal);
}
// add delete op for peer
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
@@ -352,10 +352,10 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
}
/**
- * This methods copies all the hlogs queues from another region server and returns them all sorted
+ * This methods copies all the wals queues from another region server and returns them all sorted
* per peer cluster (appended with the dead server's znode)
* @param znode server names to copy
- * @return all hlogs for all peers of that cluster, null if an error occurred
+ * @return all wals for all peers of that cluster, null if an error occurred
*/
private SortedMap> copyQueuesFromRS(String znode) {
// TODO this method isn't atomic enough, we could start copying and then
@@ -383,31 +383,31 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
String newCluster = cluster + "-" + znode;
String newClusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, newCluster);
String clusterPath = ZKUtil.joinZNode(nodePath, cluster);
- List hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, clusterPath);
+ List wals = ZKUtil.listChildrenNoWatch(this.zookeeper, clusterPath);
// That region server didn't have anything to replicate for this cluster
- if (hlogs == null || hlogs.size() == 0) {
+ if (wals == null || wals.size() == 0) {
continue;
}
ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode,
HConstants.EMPTY_BYTE_ARRAY);
SortedSet logQueue = new TreeSet();
queues.put(newCluster, logQueue);
- for (String hlog : hlogs) {
- String z = ZKUtil.joinZNode(clusterPath, hlog);
+ for (String wal : wals) {
+ String z = ZKUtil.joinZNode(clusterPath, wal);
byte[] positionBytes = ZKUtil.getData(this.zookeeper, z);
long position = 0;
try {
- position = ZKUtil.parseHLogPositionFrom(positionBytes);
+ position = ZKUtil.parseWALPositionFrom(positionBytes);
} catch (DeserializationException e) {
- LOG.warn("Failed parse of hlog position from the following znode: " + z
+ LOG.warn("Failed parse of wal position from the following znode: " + z
+ ", Exception: " + e);
}
- LOG.debug("Creating " + hlog + " with data " + position);
- String child = ZKUtil.joinZNode(newClusterZnode, hlog);
+ LOG.debug("Creating " + wal + " with data " + position);
+ String child = ZKUtil.joinZNode(newClusterZnode, wal);
// Position doesn't actually change, we are just deserializing it for
// logging, so just use the already serialized version
ZKUtil.createAndWatch(this.zookeeper, child, positionBytes);
- logQueue.add(hlog);
+ logQueue.add(wal);
}
}
} catch (KeeperException e) {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
index aef81e0..d63a206 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
@@ -1673,7 +1673,7 @@ public class ZKUtil {
if (data != null && data.length > 0) { // log position
long position = 0;
try {
- position = ZKUtil.parseHLogPositionFrom(ZKUtil.getData(zkw, znodeToProcess));
+ position = ZKUtil.parseWALPositionFrom(ZKUtil.getData(zkw, znodeToProcess));
sb.append(position);
} catch (DeserializationException ignored) {
} catch (InterruptedException e) {
@@ -1924,7 +1924,7 @@ public class ZKUtil {
/**
* @param position
* @return Serialized protobuf of position with pb magic prefix prepended suitable
- * for use as content of an hlog position in a replication queue.
+ * for use as content of an wal position in a replication queue.
*/
public static byte[] positionToByteArray(final long position) {
byte[] bytes = ZooKeeperProtos.ReplicationHLogPosition.newBuilder().setPosition(position)
@@ -1933,13 +1933,13 @@ public class ZKUtil {
}
/**
- * @param bytes - Content of a HLog position znode.
- * @return long - The current HLog position.
+ * @param bytes - Content of a WAL position znode.
+ * @return long - The current WAL position.
* @throws DeserializationException
*/
- public static long parseHLogPositionFrom(final byte[] bytes) throws DeserializationException {
+ public static long parseWALPositionFrom(final byte[] bytes) throws DeserializationException {
if (bytes == null) {
- throw new DeserializationException("Unable to parse null HLog position.");
+ throw new DeserializationException("Unable to parse null WAL position.");
}
if (ProtobufUtil.isPBMagicPrefix(bytes)) {
int pblen = ProtobufUtil.lengthOfPBMagic();
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 223a0e3..454f346 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -874,7 +874,7 @@ public final class HConstants {
/** File permission umask to use when creating hbase data files */
public static final String DATA_FILE_UMASK_KEY = "hbase.data.umask";
- /** Configuration name of HLog Compression */
+ /** Configuration name of WAL Compression */
public static final String ENABLE_WAL_COMPRESSION =
"hbase.regionserver.wal.enablecompression";
@@ -1006,7 +1006,7 @@ public final class HConstants {
/** Configuration key for the name of the master WAL encryption key for the cluster, a string */
public static final String CRYPTO_WAL_KEY_NAME_CONF_KEY = "hbase.crypto.wal.key.name";
- /** Configuration key for enabling HLog encryption, a boolean */
+ /** Configuration key for enabling WAL encryption, a boolean */
public static final String ENABLE_WAL_ENCRYPTION = "hbase.regionserver.wal.encryption";
/** Configuration key for setting RPC codec class name */
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
index f6cb93d..b4b5755 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
@@ -1195,7 +1195,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
/**
* Produces a string map for this key/value pair. Useful for programmatic use
- * and manipulation of the data stored in an HLogKey, for example, printing
+ * and manipulation of the data stored in an WALKey, for example, printing
* as JSON. Values are left out due to their tendency to be large. If needed,
* they can be added manually.
*
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index 15a53dc..4a24a43 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -112,8 +112,8 @@ possible configurations would overwhelm and obscure the important.
hbase.master.logcleaner.pluginsorg.apache.hadoop.hbase.master.cleaner.TimeToLiveLogCleanerA comma-separated list of BaseLogCleanerDelegate invoked by
- the LogsCleaner service. These WAL/HLog cleaners are called in order,
- so put the HLog cleaner that prunes the most HLog files in front. To
+ the LogsCleaner service. These WAL cleaners are called in order,
+ so put the cleaner that prunes the most files in front. To
implement your own BaseLogCleanerDelegate, just put it in HBase's classpath
and add the fully qualified class name here. Always add the above
default log cleaners in the list.
@@ -121,7 +121,7 @@ possible configurations would overwhelm and obscure the important.
hbase.master.logcleaner.ttl600000
- Maximum time a HLog can stay in the .oldlogdir directory,
+ Maximum time a WAL can stay in the .oldlogdir directory,
after which it will be cleaned by a Master thread.
@@ -265,12 +265,12 @@ possible configurations would overwhelm and obscure the important.
hbase.regionserver.hlog.reader.implorg.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader
- The HLog file reader implementation.
+ The WAL file reader implementation.hbase.regionserver.hlog.writer.implorg.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter
- The HLog file writer implementation.
+ The WAL file writer implementation.hbase.master.distributed.log.replay
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterFileSystemSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterFileSystemSource.java
index 2307599..6cf942b 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterFileSystemSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterFileSystemSource.java
@@ -48,9 +48,9 @@ public interface MetricsMasterFileSystemSource extends BaseSource {
String SPLIT_SIZE_NAME = "hlogSplitSize";
String META_SPLIT_TIME_DESC = "Time it takes to finish splitMetaLog()";
- String META_SPLIT_SIZE_DESC = "Size of hbase:meta HLog files being split";
- String SPLIT_TIME_DESC = "Time it takes to finish HLog.splitLog()";
- String SPLIT_SIZE_DESC = "Size of HLog files being split";
+ String META_SPLIT_SIZE_DESC = "Size of hbase:meta WAL files being split";
+ String SPLIT_TIME_DESC = "Time it takes to finish WAL.splitLog()";
+ String SPLIT_SIZE_DESC = "Size of WAL files being split";
void updateMetaWALSplitTime(long time);
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
index 5bbab08..c64cc88 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
@@ -129,10 +129,10 @@ public interface MetricsRegionServerSource extends BaseSource {
String REGION_COUNT_DESC = "Number of regions";
String STORE_COUNT = "storeCount";
String STORE_COUNT_DESC = "Number of Stores";
- String HLOGFILE_COUNT = "hlogFileCount";
- String HLOGFILE_COUNT_DESC = "Number of HLog Files";
- String HLOGFILE_SIZE = "hlogFileSize";
- String HLOGFILE_SIZE_DESC = "Size of all HLog Files";
+ String WALFILE_COUNT = "hlogFileCount";
+ String WALFILE_COUNT_DESC = "Number of WAL Files";
+ String WALFILE_SIZE = "hlogFileSize";
+ String WALFILE_SIZE_DESC = "Size of all WAL Files";
String STOREFILE_COUNT = "storeFileCount";
String STOREFILE_COUNT_DESC = "Number of Store Files";
String MEMSTORE_SIZE = "memStoreSize";
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java
index 32eea04..dea2440 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java
@@ -68,14 +68,14 @@ public interface MetricsRegionServerWrapper {
long getNumStores();
/**
- * Get the number of HLog files of this region server.
+ * Get the number of WAL files of this region server.
*/
- public long getNumHLogFiles();
+ public long getNumWALFiles();
/**
- * Get the size of HLog files of this region server.
+ * Get the size of WAL files of this region server.
*/
- public long getHLogFileSize();
+ public long getWALFileSize();
/**
* Get the number of store files hosted on this region server.
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsEditsReplaySource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsEditsReplaySource.java
index 793429d..4f8cb36 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsEditsReplaySource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsEditsReplaySource.java
@@ -39,7 +39,7 @@ public interface MetricsEditsReplaySource extends BaseSource {
/**
* Description
*/
- String METRICS_DESCRIPTION = "Metrics about HBase RegionServer HLog Edits Replay";
+ String METRICS_DESCRIPTION = "Metrics about HBase RegionServer WAL Edits Replay";
/**
* The name of the metrics context that metrics will be under in jmx
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java
index 1c59f65..ba0df80 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.regionserver.wal;
import org.apache.hadoop.hbase.metrics.BaseSource;
/**
- * Interface of the source that will export metrics about the region server's HLog.
+ * Interface of the source that will export metrics about the region server's WAL.
*/
public interface MetricsWALSource extends BaseSource {
@@ -39,7 +39,7 @@ public interface MetricsWALSource extends BaseSource {
/**
* Description
*/
- String METRICS_DESCRIPTION = "Metrics about HBase RegionServer HLog";
+ String METRICS_DESCRIPTION = "Metrics about HBase RegionServer WAL";
/**
* The name of the metrics context that metrics will be under in jmx
@@ -52,11 +52,11 @@ public interface MetricsWALSource extends BaseSource {
String APPEND_COUNT = "appendCount";
String APPEND_COUNT_DESC = "Number of appends to the write ahead log.";
String APPEND_SIZE = "appendSize";
- String APPEND_SIZE_DESC = "Size (in bytes) of the data appended to the HLog.";
+ String APPEND_SIZE_DESC = "Size (in bytes) of the data appended to the WAL.";
String SLOW_APPEND_COUNT = "slowAppendCount";
String SLOW_APPEND_COUNT_DESC = "Number of appends that were slow.";
String SYNC_TIME = "syncTime";
- String SYNC_TIME_DESC = "The time it took to sync the HLog to HDFS.";
+ String SYNC_TIME_DESC = "The time it took to sync the WAL to HDFS.";
/**
* Add the append size.
@@ -69,7 +69,7 @@ public interface MetricsWALSource extends BaseSource {
void incrementAppendTime(long time);
/**
- * Increment the count of hlog appends
+ * Increment the count of wal appends
*/
void incrementAppendCount();
@@ -79,7 +79,7 @@ public interface MetricsWALSource extends BaseSource {
void incrementSlowAppendCount();
/**
- * Add the time it took to sync the hlog.
+ * Add the time it took to sync the wal.
*/
void incrementSyncTime(long time);
diff --git a/hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsHLogSource.java b/hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsHLogSource.java
deleted file mode 100644
index b2bf1f2..0000000
--- a/hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsHLogSource.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.regionserver.wal;
-
-import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
-import org.junit.Test;
-
-public class TestMetricsHLogSource {
-
- @Test(expected=RuntimeException.class)
- public void testGetInstanceNoHadoopCompat() throws Exception {
- //This should throw an exception because there is no compat lib on the class path.
- CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
-
- }
-}
diff --git a/hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWALSource.java b/hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWALSource.java
new file mode 100644
index 0000000..5254198
--- /dev/null
+++ b/hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWALSource.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
+import org.junit.Test;
+
+public class TestMetricsWALSource {
+
+ @Test(expected=RuntimeException.class)
+ public void testGetInstanceNoHadoopCompat() throws Exception {
+ //This should throw an exception because there is no compat lib on the class path.
+ CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
+
+ }
+}
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
index c7b26b6..cb12aa1 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
@@ -168,8 +168,8 @@ public class MetricsRegionServerSourceImpl
if (rsWrap != null) {
mrb.addGauge(Interns.info(REGION_COUNT, REGION_COUNT_DESC), rsWrap.getNumOnlineRegions())
.addGauge(Interns.info(STORE_COUNT, STORE_COUNT_DESC), rsWrap.getNumStores())
- .addGauge(Interns.info(HLOGFILE_COUNT, HLOGFILE_COUNT_DESC), rsWrap.getNumHLogFiles())
- .addGauge(Interns.info(HLOGFILE_SIZE, HLOGFILE_SIZE_DESC), rsWrap.getHLogFileSize())
+ .addGauge(Interns.info(WALFILE_COUNT, WALFILE_COUNT_DESC), rsWrap.getNumWALFiles())
+ .addGauge(Interns.info(WALFILE_SIZE, WALFILE_SIZE_DESC), rsWrap.getWALFileSize())
.addGauge(Interns.info(STOREFILE_COUNT, STOREFILE_COUNT_DESC), rsWrap.getNumStoreFiles())
.addGauge(Interns.info(MEMSTORE_SIZE, MEMSTORE_SIZE_DESC), rsWrap.getMemstoreSize())
.addGauge(Interns.info(STOREFILE_SIZE, STOREFILE_SIZE_DESC), rsWrap.getStoreFileSize())
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java
index ad8f24c..d602d2f 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java
@@ -25,9 +25,10 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong;
/**
- * Class that transitions metrics from HLog's MetricsWAL into the metrics subsystem.
+ * Class that transitions metrics from MetricsWAL into the metrics subsystem.
*
* Implements BaseSource through BaseSourceImpl, following the pattern.
+ * @see org.apache.hadoop.hbase.regionserver.wal.MetricsWAL
*/
@InterfaceAudience.Private
public class MetricsWALSourceImpl extends BaseSourceImpl implements MetricsWALSource {
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithEncryption.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithEncryption.java
index f4246a2..f4f5a2c 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithEncryption.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithEncryption.java
@@ -26,7 +26,8 @@ import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileReaderV3;
import org.apache.hadoop.hbase.io.hfile.HFileWriterV3;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.wal.WAL.Reader;
+import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader;
import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogWriter;
import org.apache.hadoop.util.ToolRunner;
@@ -60,9 +61,9 @@ public class IntegrationTestIngestWithEncryption extends IntegrationTestIngest {
conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
- HLog.Reader.class);
+ Reader.class);
conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
- HLog.Writer.class);
+ Writer.class);
conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
}
super.setUpCluster();
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/mttr/IntegrationTestMTTR.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/mttr/IntegrationTestMTTR.java
index de5281b..955ed18 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/mttr/IntegrationTestMTTR.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/mttr/IntegrationTestMTTR.java
@@ -82,7 +82,7 @@ import com.google.common.base.Objects;
*
*
* Load Test Tool.
- * This runs so that all RegionServers will have some load and HLogs will be full.
+ * This runs so that all RegionServers will have some load and WALs will be full.
*
*
* Scan thread.
@@ -151,7 +151,7 @@ public class IntegrationTestMTTR {
private static Action restartMasterAction;
/**
- * The load test tool used to create load and make sure that HLogs aren't empty.
+ * The load test tool used to create load and make sure that WALs aren't empty.
*/
private static LoadTestTool loadTool;
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
index 9ffe0fd..3828742 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
@@ -16956,6 +16956,12 @@ public final class AdminProtos {
}
/**
* Protobuf type {@code RollWALWriterResponse}
+ *
+ *
+ *
+ * Roll request responses no longer include regions to flush
+ * this list will always be empty when talking to a 1.0 server
+ *
*/
public static final class RollWALWriterResponse extends
com.google.protobuf.GeneratedMessage
@@ -17241,6 +17247,12 @@ public final class AdminProtos {
}
/**
* Protobuf type {@code RollWALWriterResponse}
+ *
+ *
+ *
+ * Roll request responses no longer include regions to flush
+ * this list will always be empty when talking to a 1.0 server
+ *
*/
public static final class Builder extends
com.google.protobuf.GeneratedMessage.Builder
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RegionServerStatusProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RegionServerStatusProtos.java
index 19a8d74..ec169d5 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RegionServerStatusProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RegionServerStatusProtos.java
@@ -4496,7 +4496,7 @@ public final class RegionServerStatusProtos {
* required uint64 last_flushed_sequence_id = 1;
*
*
- ** the last HLog sequence id flushed from MemStore to HFile for the region
+ * the last WAL sequence id flushed from MemStore to HFile for the region
*
*/
boolean hasLastFlushedSequenceId();
@@ -4504,7 +4504,7 @@ public final class RegionServerStatusProtos {
* required uint64 last_flushed_sequence_id = 1;
*
*
- ** the last HLog sequence id flushed from MemStore to HFile for the region
+ * the last WAL sequence id flushed from MemStore to HFile for the region
*
*/
long getLastFlushedSequenceId();
@@ -4612,7 +4612,7 @@ public final class RegionServerStatusProtos {
* required uint64 last_flushed_sequence_id = 1;
*
*
- ** the last HLog sequence id flushed from MemStore to HFile for the region
+ * the last WAL sequence id flushed from MemStore to HFile for the region
*
*/
public boolean hasLastFlushedSequenceId() {
@@ -4622,7 +4622,7 @@ public final class RegionServerStatusProtos {
* required uint64 last_flushed_sequence_id = 1;
*
*
- ** the last HLog sequence id flushed from MemStore to HFile for the region
+ * the last WAL sequence id flushed from MemStore to HFile for the region
*
*/
public long getLastFlushedSequenceId() {
@@ -4908,7 +4908,7 @@ public final class RegionServerStatusProtos {
* required uint64 last_flushed_sequence_id = 1;
*
*
- ** the last HLog sequence id flushed from MemStore to HFile for the region
+ * the last WAL sequence id flushed from MemStore to HFile for the region
*
*/
public boolean hasLastFlushedSequenceId() {
@@ -4918,7 +4918,7 @@ public final class RegionServerStatusProtos {
* required uint64 last_flushed_sequence_id = 1;
*
*
- ** the last HLog sequence id flushed from MemStore to HFile for the region
+ * the last WAL sequence id flushed from MemStore to HFile for the region
*
*/
public long getLastFlushedSequenceId() {
@@ -4928,7 +4928,7 @@ public final class RegionServerStatusProtos {
* required uint64 last_flushed_sequence_id = 1;
*
*
- ** the last HLog sequence id flushed from MemStore to HFile for the region
+ * the last WAL sequence id flushed from MemStore to HFile for the region
*
*/
public Builder setLastFlushedSequenceId(long value) {
@@ -4941,7 +4941,7 @@ public final class RegionServerStatusProtos {
* required uint64 last_flushed_sequence_id = 1;
*
*
- ** the last HLog sequence id flushed from MemStore to HFile for the region
+ * the last WAL sequence id flushed from MemStore to HFile for the region
*
*/
public Builder clearLastFlushedSequenceId() {
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
index af61d47..977db42 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
@@ -1242,7 +1242,9 @@ public final class WALProtos {
* Protobuf type {@code WALKey}
*
*
- * Protocol buffer version of HLogKey; see HLogKey comment, not really a key but WALEdit header for some KVs
+ *
+ * Protocol buffer version of WALKey; see WALKey comment, not really a key but WALEdit header
+ * for some KVs
*
*/
public static final class WALKey extends
@@ -2033,7 +2035,9 @@ public final class WALProtos {
* Protobuf type {@code WALKey}
*
*
- * Protocol buffer version of HLogKey; see HLogKey comment, not really a key but WALEdit header for some KVs
+ *
+ * Protocol buffer version of WALKey; see WALKey comment, not really a key but WALEdit header
+ * for some KVs
*
*/
public static final class Builder extends
@@ -10021,8 +10025,10 @@ public final class WALProtos {
*
*
**
- * A trailer that is appended to the end of a properly closed HLog WAL file.
+ * A trailer that is appended to the end of a properly closed WAL file.
* If missing, this is either a legacy or a corrupted WAL file.
+ * N.B. This trailer currently doesn't contain any information and we
+ * purposefully don't expose it in the WAL APIs. It's for future growth.
*
*/
public static final class WALTrailer extends
@@ -10246,8 +10252,10 @@ public final class WALProtos {
*
*
**
- * A trailer that is appended to the end of a properly closed HLog WAL file.
+ * A trailer that is appended to the end of a properly closed WAL file.
* If missing, this is either a legacy or a corrupted WAL file.
+ * N.B. This trailer currently doesn't contain any information and we
+ * purposefully don't expose it in the WAL APIs. It's for future growth.
*
*/
public static final class Builder extends
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
index 8bad13f..6a6cb5e 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
@@ -7105,7 +7105,7 @@ public final class ZooKeeperProtos {
*
*
**
- * Used by replication. Holds the current position in an HLog file.
+ * Used by replication. Holds the current position in an WAL file.
*
*/
public static final class ReplicationHLogPosition extends
@@ -7373,7 +7373,7 @@ public final class ZooKeeperProtos {
*
*
**
- * Used by replication. Holds the current position in an HLog file.
+ * Used by replication. Holds the current position in an WAL file.
*
*/
public static final class Builder extends
diff --git a/hbase-protocol/src/main/protobuf/Admin.proto b/hbase-protocol/src/main/protobuf/Admin.proto
index ec34e72..fcc4e1d 100644
--- a/hbase-protocol/src/main/protobuf/Admin.proto
+++ b/hbase-protocol/src/main/protobuf/Admin.proto
@@ -204,6 +204,10 @@ message ReplicateWALEntryResponse {
message RollWALWriterRequest {
}
+/*
+ * Roll request responses no longer include regions to flush
+ * this list will always be empty when talking to a 1.0 server
+ */
message RollWALWriterResponse {
// A list of encoded name of regions to flush
repeated bytes region_to_flush = 1;
diff --git a/hbase-protocol/src/main/protobuf/RegionServerStatus.proto b/hbase-protocol/src/main/protobuf/RegionServerStatus.proto
index df9a521..75e5ae4 100644
--- a/hbase-protocol/src/main/protobuf/RegionServerStatus.proto
+++ b/hbase-protocol/src/main/protobuf/RegionServerStatus.proto
@@ -74,7 +74,7 @@ message GetLastFlushedSequenceIdRequest {
}
message GetLastFlushedSequenceIdResponse {
- /** the last HLog sequence id flushed from MemStore to HFile for the region */
+ /* the last WAL sequence id flushed from MemStore to HFile for the region */
required uint64 last_flushed_sequence_id = 1;
}
diff --git a/hbase-protocol/src/main/protobuf/WAL.proto b/hbase-protocol/src/main/protobuf/WAL.proto
index dae92d2..f8a1534 100644
--- a/hbase-protocol/src/main/protobuf/WAL.proto
+++ b/hbase-protocol/src/main/protobuf/WAL.proto
@@ -31,7 +31,10 @@ message WALHeader {
optional string cell_codec_cls_name = 5;
}
-// Protocol buffer version of HLogKey; see HLogKey comment, not really a key but WALEdit header for some KVs
+/*
+ * Protocol buffer version of WALKey; see WALKey comment, not really a key but WALEdit header
+ * for some KVs
+ */
message WALKey {
required bytes encoded_region_name = 1;
required bytes table_name = 2;
@@ -144,8 +147,10 @@ message RegionEventDescriptor {
}
/**
- * A trailer that is appended to the end of a properly closed HLog WAL file.
+ * A trailer that is appended to the end of a properly closed WAL file.
* If missing, this is either a legacy or a corrupted WAL file.
+ * N.B. This trailer currently doesn't contain any information and we
+ * purposefully don't expose it in the WAL APIs. It's for future growth.
*/
message WALTrailer {
}
diff --git a/hbase-protocol/src/main/protobuf/ZooKeeper.proto b/hbase-protocol/src/main/protobuf/ZooKeeper.proto
index 2ccfe8d..bd1dc30 100644
--- a/hbase-protocol/src/main/protobuf/ZooKeeper.proto
+++ b/hbase-protocol/src/main/protobuf/ZooKeeper.proto
@@ -142,7 +142,7 @@ message ReplicationState {
}
/**
- * Used by replication. Holds the current position in an HLog file.
+ * Used by replication. Holds the current position in an WAL file.
*/
message ReplicationHLogPosition {
required int64 position = 1;
diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ServerMetricsTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ServerMetricsTmpl.jamon
index d09c4bf..c496bf6 100644
--- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ServerMetricsTmpl.jamon
+++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ServerMetricsTmpl.jamon
@@ -40,7 +40,7 @@ java.lang.management.ManagementFactory;
%def>
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogCounters.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogCounters.java
index b9614b4..f1a8c59 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogCounters.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogCounters.java
@@ -32,7 +32,7 @@ public class SplitLogCounters {
public final static AtomicLong tot_mgr_log_split_batch_start = new AtomicLong(0);
public final static AtomicLong tot_mgr_log_split_batch_success = new AtomicLong(0);
public final static AtomicLong tot_mgr_log_split_batch_err = new AtomicLong(0);
- public final static AtomicLong tot_mgr_new_unexpected_hlogs = new AtomicLong(0);
+ public final static AtomicLong tot_mgr_new_unexpected_wals = new AtomicLong(0);
public final static AtomicLong tot_mgr_log_split_start = new AtomicLong(0);
public final static AtomicLong tot_mgr_log_split_success = new AtomicLong(0);
public final static AtomicLong tot_mgr_log_split_err = new AtomicLong(0);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java
index 0566a07..164c136 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java
@@ -44,7 +44,7 @@ import com.google.common.annotations.VisibleForTesting;
* for external changes in coordination (if required)
* {@link #endTask(SplitLogTask, AtomicLong, SplitTaskDetails)} notify coordination engine that
*
- * Important methods for HLogSplitterHandler:
+ * Important methods for WALSplitterHandler:
* splitting task has completed.
*/
@InterfaceAudience.Private
@@ -112,7 +112,7 @@ public interface SplitLogWorkerCoordination {
*/
void removeListener();
- /* HLogSplitterHandler part */
+ /* WALSplitterHandler part */
/**
* Notify coordination engine that splitting task has completed.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
index 1d9cbf9..1e02632 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
@@ -51,8 +51,8 @@ import org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective;
import org.apache.hadoop.hbase.master.SplitLogManager.Task;
import org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
-import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -113,7 +113,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
@Override
public Status finish(ServerName workerName, String logfile) {
try {
- HLogSplitter.finishSplitLogFile(logfile, manager.getServer().getConfiguration());
+ WALSplitter.finishSplitLogFile(logfile, manager.getServer().getConfiguration());
} catch (IOException e) {
LOG.warn("Could not finish splitting of log file " + logfile, e);
return Status.ERR;
@@ -715,7 +715,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
}
// decode the file name
t = ZKSplitLog.getFileName(t);
- ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(new Path(t));
+ ServerName serverName = DefaultWALProvider.getServerNameFromWALDirectoryName(new Path(t));
if (serverName != null) {
knownFailedServers.add(serverName.getServerName());
} else {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
index 4bf41c5..a0addb0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
@@ -45,8 +45,8 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor;
-import org.apache.hadoop.hbase.regionserver.handler.HLogSplitterHandler;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+import org.apache.hadoop.hbase.regionserver.handler.WALSplitterHandler;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
@@ -317,8 +317,8 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
splitTaskDetails.setTaskNode(curTask);
splitTaskDetails.setCurTaskZKVersion(zkVersion);
- HLogSplitterHandler hsh =
- new HLogSplitterHandler(server, this, splitTaskDetails, reporter,
+ WALSplitterHandler hsh =
+ new WALSplitterHandler(server, this, splitTaskDetails, reporter,
this.tasksInProgress, splitTaskExecutor, mode);
server.getExecutorService().submit(hsh);
}
@@ -417,7 +417,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
// pick meta wal firstly
int offset = (int) (Math.random() * paths.size());
for (int i = 0; i < paths.size(); i++) {
- if (HLogUtil.isMetaFile(paths.get(i))) {
+ if (DefaultWALProvider.isMetaFile(paths.get(i))) {
offset = i;
break;
}
@@ -580,7 +580,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
}
/*
- * Next part is related to HLogSplitterHandler
+ * Next part is related to WALSplitterHandler
*/
/**
* endTask() can fail and the only way to recover out of it is for the
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
index 8b26eea..215ff16 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Pair;
@@ -444,14 +445,32 @@ public abstract class BaseRegionObserver implements RegionObserver {
final InternalScanner s) throws IOException {
}
+ /**
+ * Implementers should override this version of the method and leave the deprecated one as-is.
+ */
+ @Override
+ public void preWALRestore(ObserverContext extends RegionCoprocessorEnvironment> env,
+ HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
+ }
+
@Override
public void preWALRestore(ObserverContext env, HRegionInfo info,
HLogKey logKey, WALEdit logEdit) throws IOException {
+ preWALRestore(env, info, (WALKey)logKey, logEdit);
+ }
+
+ /**
+ * Implementers should override this version of the method and leave the deprecated one as-is.
+ */
+ @Override
+ public void postWALRestore(ObserverContext extends RegionCoprocessorEnvironment> env,
+ HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
}
@Override
public void postWALRestore(ObserverContext env,
HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException {
+ postWALRestore(env, info, (WALKey)logKey, logEdit);
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseWALObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseWALObserver.java
index 0836da9..cfddcd4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseWALObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseWALObserver.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
/**
@@ -42,13 +43,31 @@ public class BaseWALObserver implements WALObserver {
@Override
public void stop(CoprocessorEnvironment e) throws IOException { }
+ /**
+ * Implementers should override this method and leave the deprecated version as-is.
+ */
+ @Override
+ public boolean preWALWrite(ObserverContext extends WALCoprocessorEnvironment> ctx,
+ HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
+ return false;
+ }
+
@Override
public boolean preWALWrite(ObserverContext ctx, HRegionInfo info,
HLogKey logKey, WALEdit logEdit) throws IOException {
- return false;
+ return preWALWrite(ctx, info, (WALKey)logKey, logEdit);
}
+ /**
+ * Implementers should override this method and leave the deprecated version as-is.
+ */
+ @Override
+ public void postWALWrite(ObserverContext extends WALCoprocessorEnvironment> ctx,
+ HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { }
+
@Override
public void postWALWrite(ObserverContext ctx, HRegionInfo info,
- HLogKey logKey, WALEdit logEdit) throws IOException { }
+ HLogKey logKey, WALEdit logEdit) throws IOException {
+ postWALWrite(ctx, info, (WALKey)logKey, logEdit);
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
index 7470d85..312a3ae 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
@@ -29,6 +29,7 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
@@ -556,4 +557,77 @@ public abstract class CoprocessorHost {
"coprocessor set.", e);
}
}
+
+ /**
+ * Used to gracefully handle fallback to deprecated methods when we
+ * evolve coprocessor APIs.
+ *
+ * When a particular Coprocessor API is updated to change methods, hosts can support fallback
+ * to the deprecated API by using this method to determine if an instance implements the new API.
+ * In the event that said support is partial, then in the face of a runtime issue that prevents
+ * proper operation {@link #legacyWarning(Class, String)} should be used to let operators know.
+ *
+ * For examples of this in action, see the implementation of
+ *
+ *
+ * @param clazz Coprocessor you wish to evaluate
+ * @param methodName the name of the non-deprecated method version
+ * @param parameterTypes the Class of the non-deprecated method's arguments in the order they are
+ * declared.
+ */
+ @InterfaceAudience.Private
+ protected static boolean useLegacyMethod(final Class extends Coprocessor> clazz,
+ final String methodName, final Class>... parameterTypes) {
+ boolean useLegacy;
+ // Use reflection to see if they implement the non-deprecated version
+ try {
+ clazz.getDeclaredMethod(methodName, parameterTypes);
+ LOG.debug("Found an implementation of '" + methodName + "' that uses updated method " +
+ "signature. Skipping legacy support for invocations in '" + clazz +"'.");
+ useLegacy = false;
+ } catch (NoSuchMethodException exception) {
+ useLegacy = true;
+ } catch (SecurityException exception) {
+ LOG.warn("The Security Manager denied our attempt to detect if the coprocessor '" + clazz +
+ "' requires legacy support; assuming it does. If you get later errors about legacy " +
+ "coprocessor use, consider updating your security policy to allow access to the package" +
+ " and declared members of your implementation.");
+ LOG.debug("Details of Security Manager rejection.", exception);
+ useLegacy = true;
+ }
+ return useLegacy;
+ }
+
+ /**
+ * Used to limit legacy handling to once per Coprocessor class per classloader.
+ */
+ private static final Set> legacyWarning =
+ new ConcurrentSkipListSet>(
+ new Comparator>() {
+ @Override
+ public int compare(Class extends Coprocessor> c1, Class extends Coprocessor> c2) {
+ if (c1.equals(c2)) {
+ return 0;
+ }
+ return c1.getName().compareTo(c2.getName());
+ }
+ });
+
+ /**
+ * limits the amount of logging to once per coprocessor class.
+ * Used in concert with {@link #useLegacyMethod(Class, String, Class[])} when a runtime issue
+ * prevents properly supporting the legacy version of a coprocessor API.
+ * Since coprocessors can be in tight loops this serves to limit the amount of log spam we create.
+ */
+ @InterfaceAudience.Private
+ protected void legacyWarning(final Class extends Coprocessor> clazz, final String message) {
+ if(legacyWarning.add(clazz)) {
+ LOG.error("You have a legacy coprocessor loaded and there are events we can't map to the " +
+ " deprecated API. Your coprocessor will not see these events. Please update '" + clazz +
+ "'. Details of the problem: " + message);
+ }
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
index 3b25455..a8b20ea 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Pair;
@@ -64,6 +65,9 @@ import com.google.common.collect.ImmutableList;
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
+// TODO as method signatures need to break, update to
+// ObserverContext extends RegionCoprocessorEnvironment>
+// so we can use additional environment state that isn't exposed to coprocessors.
public interface RegionObserver extends Coprocessor {
/** Mutation type for postMutationBeforeWAL hook */
@@ -1106,26 +1110,62 @@ public interface RegionObserver extends Coprocessor {
/**
* Called before a {@link org.apache.hadoop.hbase.regionserver.wal.WALEdit}
* replayed for this region.
+ */
+ void preWALRestore(final ObserverContext extends RegionCoprocessorEnvironment> ctx,
+ HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException;
+
+ /**
+ * Called before a {@link org.apache.hadoop.hbase.regionserver.wal.WALEdit}
+ * replayed for this region.
*
- * @param ctx
- * @param info
- * @param logKey
- * @param logEdit
- * @throws IOException
+ * This method is left in place to maintain binary compatibility with older
+ * {@link RegionObserver}s. If an implementation directly overrides
+ * {@link #preWALRestore(ObserverContext, HRegionInfo, WALKey, WALEdit)} then this version
+ * won't be called at all, barring problems with the Security Manager. To work correctly
+ * in the presence of a strict Security Manager, or in the case of an implementation that
+ * relies on a parent class to implement preWALRestore, you should implement this method
+ * as a call to the non-deprecated version.
+ *
+ * Users of this method will see all edits that can be treated as HLogKey. If there are
+ * edits that can't be treated as HLogKey they won't be offered to coprocessors that rely
+ * on this method. If a coprocessor gets skipped because of this mechanism, a log message
+ * at ERROR will be generated per coprocessor on the logger for {@link CoprocessorHost} once per
+ * classloader.
+ *
+ * @deprecated use {@link #preWALRestore(ObserverContext, HRegionInfo, WALKey, WALEdit)}
*/
+ @Deprecated
void preWALRestore(final ObserverContext ctx,
HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException;
/**
* Called after a {@link org.apache.hadoop.hbase.regionserver.wal.WALEdit}
* replayed for this region.
+ */
+ void postWALRestore(final ObserverContext extends RegionCoprocessorEnvironment> ctx,
+ HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException;
+
+ /**
+ * Called after a {@link org.apache.hadoop.hbase.regionserver.wal.WALEdit}
+ * replayed for this region.
*
- * @param ctx
- * @param info
- * @param logKey
- * @param logEdit
- * @throws IOException
+ * This method is left in place to maintain binary compatibility with older
+ * {@link RegionObserver}s. If an implementation directly overrides
+ * {@link #postWALRestore(ObserverContext, HRegionInfo, WALKey, WALEdit)} then this version
+ * won't be called at all, barring problems with the Security Manager. To work correctly
+ * in the presence of a strict Security Manager, or in the case of an implementation that
+ * relies on a parent class to implement preWALRestore, you should implement this method
+ * as a call to the non-deprecated version.
+ *
+ * Users of this method will see all edits that can be treated as HLogKey. If there are
+ * edits that can't be treated as HLogKey they won't be offered to coprocessors that rely
+ * on this method. If a coprocessor gets skipped because of this mechanism, a log message
+ * at ERROR will be generated per coprocessor on the logger for {@link CoprocessorHost} once per
+ * classloader.
+ *
+ * @deprecated use {@link #postWALRestore(ObserverContext, HRegionInfo, WALKey, WALEdit)}
*/
+ @Deprecated
void postWALRestore(final ObserverContext ctx,
HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALCoprocessorEnvironment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALCoprocessorEnvironment.java
index d16eed8..a4ce5f1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALCoprocessorEnvironment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALCoprocessorEnvironment.java
@@ -23,11 +23,11 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.wal.WAL;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
public interface WALCoprocessorEnvironment extends CoprocessorEnvironment {
- /** @return reference to the region server services */
- HLog getWAL();
+ /** @return reference to the region server's WAL */
+ WAL getWAL();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java
index 49d84ed..bba83cc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import java.io.IOException;
@@ -40,7 +41,7 @@ import java.io.IOException;
* hooks for adding logic for WALEdits in the region context during reconstruction,
*
* Defines coprocessor hooks for interacting with operations on the
- * {@link org.apache.hadoop.hbase.regionserver.wal.HLog}.
+ * {@link org.apache.hadoop.hbase.wal.WAL}.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
@@ -50,27 +51,65 @@ public interface WALObserver extends Coprocessor {
* Called before a {@link org.apache.hadoop.hbase.regionserver.wal.WALEdit}
* is writen to WAL.
*
- * @param ctx
- * @param info
- * @param logKey
- * @param logEdit
* @return true if default behavior should be bypassed, false otherwise
- * @throws IOException
*/
// TODO: return value is not used
+ boolean preWALWrite(ObserverContext extends WALCoprocessorEnvironment> ctx,
+ HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException;
+
+ /**
+ * Called before a {@link org.apache.hadoop.hbase.regionserver.wal.WALEdit}
+ * is writen to WAL.
+ *
+ * This method is left in place to maintain binary compatibility with older
+ * {@link WALObserver}s. If an implementation directly overrides
+ * {@link #preWALWrite(ObserverContext, HRegionInfo, WALKey, WALEdit)} then this version
+ * won't be called at all, barring problems with the Security Manager. To work correctly
+ * in the presence of a strict Security Manager, or in the case of an implementation that
+ * relies on a parent class to implement preWALWrite, you should implement this method
+ * as a call to the non-deprecated version.
+ *
+ * Users of this method will see all edits that can be treated as HLogKey. If there are
+ * edits that can't be treated as HLogKey they won't be offered to coprocessors that rely
+ * on this method. If a coprocessor gets skipped because of this mechanism, a log message
+ * at ERROR will be generated per coprocessor on the logger for {@link CoprocessorHost} once per
+ * classloader.
+ *
+ * @return true if default behavior should be bypassed, false otherwise
+ * @deprecated use {@link #preWALWrite(ObserverContext, HRegionInfo, WALKey, WALEdit)}
+ */
+ @Deprecated
boolean preWALWrite(ObserverContext ctx,
HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException;
/**
* Called after a {@link org.apache.hadoop.hbase.regionserver.wal.WALEdit}
* is writen to WAL.
+ */
+ void postWALWrite(ObserverContext extends WALCoprocessorEnvironment> ctx,
+ HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException;
+
+ /**
+ * Called after a {@link org.apache.hadoop.hbase.regionserver.wal.WALEdit}
+ * is writen to WAL.
+ *
+ * This method is left in place to maintain binary compatibility with older
+ * {@link WALObserver}s. If an implementation directly overrides
+ * {@link #postWALWrite(ObserverContext, HRegionInfo, WALKey, WALEdit)} then this version
+ * won't be called at all, barring problems with the Security Manager. To work correctly
+ * in the presence of a strict Security Manager, or in the case of an implementation that
+ * relies on a parent class to implement preWALWrite, you should implement this method
+ * as a call to the non-deprecated version.
+ *
+ * Users of this method will see all edits that can be treated as HLogKey. If there are
+ * edits that can't be treated as HLogKey they won't be offered to coprocessors that rely
+ * on this method. If a coprocessor gets skipped because of this mechanism, a log message
+ * at ERROR will be generated per coprocessor on the logger for {@link CoprocessorHost} once per
+ * classloader.
*
- * @param ctx
- * @param info
- * @param logKey
- * @param logEdit
- * @throws IOException
+ * @deprecated use {@link #postWALWrite(ObserverContext, HRegionInfo, WALKey, WALEdit)}
*/
+ @Deprecated
void postWALWrite(ObserverContext ctx,
HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java
index f8cf7b3..fb58360 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
@@ -54,8 +54,8 @@ import org.apache.hadoop.util.ReflectionUtils;
/**
* An encapsulation for the FileSystem object that hbase uses to access
* data. This class allows the flexibility of using
- * separate filesystem objects for reading and writing hfiles and hlogs.
- * In future, if we want to make hlogs be in a different filesystem,
+ * separate filesystem objects for reading and writing hfiles and wals.
+ * In future, if we want to make wals be in a different filesystem,
* this is the place to make it happen.
*/
public class HFileSystem extends FilterFileSystem {
@@ -322,7 +322,7 @@ public class HFileSystem extends FilterFileSystem {
}
/**
- * We're putting at lowest priority the hlog files blocks that are on the same datanode
+ * We're putting at lowest priority the wal files blocks that are on the same datanode
* as the original regionserver which created these files. This because we fear that the
* datanode is actually dead, so if we use it it will timeout.
*/
@@ -330,17 +330,17 @@ public class HFileSystem extends FilterFileSystem {
public void reorderBlocks(Configuration conf, LocatedBlocks lbs, String src)
throws IOException {
- ServerName sn = HLogUtil.getServerNameFromHLogDirectoryName(conf, src);
+ ServerName sn = DefaultWALProvider.getServerNameFromWALDirectoryName(conf, src);
if (sn == null) {
- // It's not an HLOG
+ // It's not an WAL
return;
}
- // Ok, so it's an HLog
+ // Ok, so it's an WAL
String hostName = sn.getHostname();
if (LOG.isTraceEnabled()) {
LOG.trace(src +
- " is an HLog file, so reordering blocks, last hostname will be:" + hostName);
+ " is an WAL file, so reordering blocks, last hostname will be:" + hostName);
}
// Just check for all blocks
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HLogLink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HLogLink.java
deleted file mode 100644
index e62eb14..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HLogLink.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.io;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.util.FSUtils;
-
-/**
- * HLogLink describes a link to a WAL.
- *
- * An hlog can be in /hbase/.logs//
- * or it can be in /hbase/.oldlogs/
- *
- * The link checks first in the original path,
- * if it is not present it fallbacks to the archived path.
- */
-@InterfaceAudience.Private
-public class HLogLink extends FileLink {
- /**
- * @param conf {@link Configuration} from which to extract specific archive locations
- * @param serverName Region Server owner of the log
- * @param logName WAL file name
- * @throws IOException on unexpected error.
- */
- public HLogLink(final Configuration conf,
- final String serverName, final String logName) throws IOException {
- this(FSUtils.getRootDir(conf), serverName, logName);
- }
-
- /**
- * @param rootDir Path to the root directory where hbase files are stored
- * @param serverName Region Server owner of the log
- * @param logName WAL file name
- */
- public HLogLink(final Path rootDir, final String serverName, final String logName) {
- final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
- final Path logDir = new Path(new Path(rootDir, HConstants.HREGION_LOGDIR_NAME), serverName);
- setLocations(new Path(logDir, logName), new Path(oldLogDir, logName));
- }
-
- /**
- * @param originPath Path to the wal in the log directory
- * @param archivePath Path to the wal in the archived log directory
- */
- public HLogLink(final Path originPath, final Path archivePath) {
- setLocations(originPath, archivePath);
- }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/WALLink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/WALLink.java
new file mode 100644
index 0000000..fc5bd5d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/WALLink.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+/**
+ * WALLink describes a link to a WAL.
+ *
+ * An wal can be in /hbase/.logs//
+ * or it can be in /hbase/.oldlogs/
+ *
+ * The link checks first in the original path,
+ * if it is not present it fallbacks to the archived path.
+ */
+@InterfaceAudience.Private
+public class WALLink extends FileLink {
+ /**
+ * @param conf {@link Configuration} from which to extract specific archive locations
+ * @param serverName Region Server owner of the log
+ * @param logName WAL file name
+ * @throws IOException on unexpected error.
+ */
+ public WALLink(final Configuration conf,
+ final String serverName, final String logName) throws IOException {
+ this(FSUtils.getRootDir(conf), serverName, logName);
+ }
+
+ /**
+ * @param rootDir Path to the root directory where hbase files are stored
+ * @param serverName Region Server owner of the log
+ * @param logName WAL file name
+ */
+ public WALLink(final Path rootDir, final String serverName, final String logName) {
+ final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+ final Path logDir = new Path(new Path(rootDir, HConstants.HREGION_LOGDIR_NAME), serverName);
+ setLocations(new Path(logDir, logName), new Path(oldLogDir, logName));
+ }
+
+ /**
+ * @param originPath Path to the wal in the log directory
+ * @param archivePath Path to the wal in the archived log directory
+ */
+ public WALLink(final Path originPath, final Path archivePath) {
+ setLocations(originPath, archivePath);
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
index 8ff4732..a4d2425 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
@@ -189,12 +189,12 @@ public class HFileOutputFormat2
rollWriters();
}
- // create a new HLog writer, if necessary
+ // create a new WAL writer, if necessary
if (wl == null || wl.writer == null) {
wl = getNewWriter(family, conf);
}
- // we now have the proper HLog writer. full steam ahead
+ // we now have the proper WAL writer. full steam ahead
kv.updateLatestStamp(this.now);
wl.writer.append(kv);
wl.written += length;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HLogInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HLogInputFormat.java
index 4f604f8..4ed0672 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HLogInputFormat.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HLogInputFormat.java
@@ -17,26 +17,15 @@
*/
package org.apache.hadoop.hbase.mapreduce;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.EOFException;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
+import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
@@ -44,227 +33,51 @@ import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
/**
- * Simple {@link InputFormat} for {@link HLog} files.
+ * Simple {@link InputFormat} for {@link WAL} files.
+ * @deprecated use {@link WALInputFormat}
*/
+@Deprecated
@InterfaceAudience.Public
public class HLogInputFormat extends InputFormat {
private static final Log LOG = LogFactory.getLog(HLogInputFormat.class);
-
public static final String START_TIME_KEY = "hlog.start.time";
public static final String END_TIME_KEY = "hlog.end.time";
- /**
- * {@link InputSplit} for {@link HLog} files. Each split represent
- * exactly one log file.
- */
- static class HLogSplit extends InputSplit implements Writable {
- private String logFileName;
- private long fileSize;
- private long startTime;
- private long endTime;
-
- /** for serialization */
- public HLogSplit() {}
-
- /**
- * Represent an HLogSplit, i.e. a single HLog file.
- * Start- and EndTime are managed by the split, so that HLog files can be
- * filtered before WALEdits are passed to the mapper(s).
- * @param logFileName
- * @param fileSize
- * @param startTime
- * @param endTime
- */
- public HLogSplit(String logFileName, long fileSize, long startTime, long endTime) {
- this.logFileName = logFileName;
- this.fileSize = fileSize;
- this.startTime = startTime;
- this.endTime = endTime;
- }
-
- @Override
- public long getLength() throws IOException, InterruptedException {
- return fileSize;
- }
-
- @Override
- public String[] getLocations() throws IOException, InterruptedException {
- // TODO: Find the data node with the most blocks for this HLog?
- return new String[] {};
- }
-
- public String getLogFileName() {
- return logFileName;
- }
-
- public long getStartTime() {
- return startTime;
- }
-
- public long getEndTime() {
- return endTime;
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- logFileName = in.readUTF();
- fileSize = in.readLong();
- startTime = in.readLong();
- endTime = in.readLong();
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeUTF(logFileName);
- out.writeLong(fileSize);
- out.writeLong(startTime);
- out.writeLong(endTime);
- }
-
- @Override
- public String toString() {
- return logFileName + " (" + startTime + ":" + endTime + ") length:" + fileSize;
- }
- }
+ // Delegate to WALInputFormat for implementation.
+ private final WALInputFormat delegate = new WALInputFormat();
/**
- * {@link RecordReader} for an {@link HLog} file.
+ * {@link RecordReader} that pulls out the legacy HLogKey format directly.
*/
- static class HLogRecordReader extends RecordReader {
- private HLog.Reader reader = null;
- private HLog.Entry currentEntry = new HLog.Entry();
- private long startTime;
- private long endTime;
-
- @Override
- public void initialize(InputSplit split, TaskAttemptContext context)
- throws IOException, InterruptedException {
- HLogSplit hsplit = (HLogSplit)split;
- Path logFile = new Path(hsplit.getLogFileName());
- Configuration conf = context.getConfiguration();
- LOG.info("Opening reader for "+split);
- try {
- this.reader = HLogFactory.createReader(logFile.getFileSystem(conf),
- logFile, conf);
- } catch (EOFException x) {
- LOG.info("Ignoring corrupted HLog file: " + logFile
- + " (This is normal when a RegionServer crashed.)");
- }
- this.startTime = hsplit.getStartTime();
- this.endTime = hsplit.getEndTime();
- }
-
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- if (reader == null) return false;
-
- HLog.Entry temp;
- long i = -1;
- do {
- // skip older entries
- try {
- temp = reader.next(currentEntry);
- i++;
- } catch (EOFException x) {
- LOG.info("Corrupted entry detected. Ignoring the rest of the file."
- + " (This is normal when a RegionServer crashed.)");
- return false;
- }
- }
- while(temp != null && temp.getKey().getWriteTime() < startTime);
-
- if (temp == null) {
- if (i > 0) LOG.info("Skipped " + i + " entries.");
- LOG.info("Reached end of file.");
- return false;
- } else if (i > 0) {
- LOG.info("Skipped " + i + " entries, until ts: " + temp.getKey().getWriteTime() + ".");
- }
- boolean res = temp.getKey().getWriteTime() <= endTime;
- if (!res) {
- LOG.info("Reached ts: " + temp.getKey().getWriteTime() + " ignoring the rest of the file.");
- }
- return res;
- }
-
+ static class HLogKeyRecordReader extends WALInputFormat.WALRecordReader {
@Override
public HLogKey getCurrentKey() throws IOException, InterruptedException {
- return currentEntry.getKey();
- }
-
- @Override
- public WALEdit getCurrentValue() throws IOException, InterruptedException {
- return currentEntry.getEdit();
- }
-
- @Override
- public float getProgress() throws IOException, InterruptedException {
- // N/A depends on total number of entries, which is unknown
- return 0;
- }
-
- @Override
- public void close() throws IOException {
- LOG.info("Closing reader");
- if (reader != null) this.reader.close();
+ if (!(currentEntry.getKey() instanceof HLogKey)) {
+ final IllegalStateException exception = new IllegalStateException(
+ "HLogInputFormat only works when given entries that have HLogKey for keys. This" +
+ " one had '" + currentEntry.getKey().getClass() + "'");
+ LOG.error("The deprecated HLogInputFormat has to work with the deprecated HLogKey class, " +
+ " but HBase internals read the wal entry using some other class." +
+ " This is a bug; please file an issue or email the developer mailing list. It is " +
+ "likely that you would not have this problem if you updated to use WALInputFormat. " +
+ "You will need the following exception details when seeking help from the HBase " +
+ "community.",
+ exception);
+ throw exception;
+ }
+ return (HLogKey)currentEntry.getKey();
}
}
@Override
public List getSplits(JobContext context) throws IOException,
InterruptedException {
- Configuration conf = context.getConfiguration();
- Path inputDir = new Path(conf.get("mapreduce.input.fileinputformat.inputdir"));
-
- long startTime = conf.getLong(START_TIME_KEY, Long.MIN_VALUE);
- long endTime = conf.getLong(END_TIME_KEY, Long.MAX_VALUE);
-
- FileSystem fs = inputDir.getFileSystem(conf);
- List files = getFiles(fs, inputDir, startTime, endTime);
-
- List splits = new ArrayList(files.size());
- for (FileStatus file : files) {
- splits.add(new HLogSplit(file.getPath().toString(), file.getLen(), startTime, endTime));
- }
- return splits;
- }
-
- private List getFiles(FileSystem fs, Path dir, long startTime, long endTime)
- throws IOException {
- List result = new ArrayList();
- LOG.debug("Scanning " + dir.toString() + " for HLog files");
-
- FileStatus[] files = fs.listStatus(dir);
- if (files == null) return Collections.emptyList();
- for (FileStatus file : files) {
- if (file.isDirectory()) {
- // recurse into sub directories
- result.addAll(getFiles(fs, file.getPath(), startTime, endTime));
- } else {
- String name = file.getPath().toString();
- int idx = name.lastIndexOf('.');
- if (idx > 0) {
- try {
- long fileStartTime = Long.parseLong(name.substring(idx+1));
- if (fileStartTime <= endTime) {
- LOG.info("Found: " + name);
- result.add(file);
- }
- } catch (NumberFormatException x) {
- idx = 0;
- }
- }
- if (idx == 0) {
- LOG.warn("File " + name + " does not appear to be an HLog file. Skipping...");
- }
- }
- }
- return result;
+ return delegate.getSplits(context, START_TIME_KEY, END_TIME_KEY);
}
@Override
public RecordReader createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
- return new HLogRecordReader();
+ return new HLogKeyRecordReader();
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java
index c1d8373..62a9626 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java
@@ -51,7 +51,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
*
*
*
- * Write-ahead logging (HLog) for Puts can be disabled by setting
+ * Write-ahead logging (WAL) for Puts can be disabled by setting
* {@link #WAL_PROPERTY} to {@link #WAL_OFF}. Default value is {@link #WAL_ON}.
* Note that disabling write-ahead logging is only appropriate for jobs where
* loss of data due to region server failure can be tolerated (for example,
@@ -61,7 +61,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
@InterfaceAudience.Public
@InterfaceStability.Stable
public class MultiTableOutputFormat extends OutputFormat {
- /** Set this to {@link #WAL_OFF} to turn off write-ahead logging (HLog) */
+ /** Set this to {@link #WAL_OFF} to turn off write-ahead logging (WAL) */
public static final String WAL_PROPERTY = "hbase.mapreduce.multitableoutputformat.wal";
/** Property value to use write-ahead logging */
public static final boolean WAL_ON = true;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
index 1c804d7..44d88c5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
@@ -46,7 +46,7 @@ import com.google.common.annotations.VisibleForTesting;
/**
* TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. The job
* bypasses HBase servers, and directly accesses the underlying files (hfile, recovered edits,
- * hlogs, etc) directly to provide maximum performance. The snapshot is not required to be
+ * wals, etc) directly to provide maximum performance. The snapshot is not required to be
* restored to the live cluster or cloned. This also allows to run the mapreduce job from an
* online or offline hbase cluster. The snapshot files can be exported by using the
* {@link org.apache.hadoop.hbase.snapshot.ExportSnapshot} tool, to a pure-hdfs cluster,
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
new file mode 100644
index 0000000..02fcbba
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WAL.Reader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Simple {@link InputFormat} for {@link org.apache.hadoop.hbase.wal.WAL} files.
+ */
+@InterfaceAudience.Public
+public class WALInputFormat extends InputFormat {
+ private static final Log LOG = LogFactory.getLog(WALInputFormat.class);
+
+ public static final String START_TIME_KEY = "wal.start.time";
+ public static final String END_TIME_KEY = "wal.end.time";
+
+ /**
+ * {@link InputSplit} for {@link WAL} files. Each split represent
+ * exactly one log file.
+ */
+ static class WALSplit extends InputSplit implements Writable {
+ private String logFileName;
+ private long fileSize;
+ private long startTime;
+ private long endTime;
+
+ /** for serialization */
+ public WALSplit() {}
+
+ /**
+ * Represent an WALSplit, i.e. a single WAL file.
+ * Start- and EndTime are managed by the split, so that WAL files can be
+ * filtered before WALEdits are passed to the mapper(s).
+ * @param logFileName
+ * @param fileSize
+ * @param startTime
+ * @param endTime
+ */
+ public WALSplit(String logFileName, long fileSize, long startTime, long endTime) {
+ this.logFileName = logFileName;
+ this.fileSize = fileSize;
+ this.startTime = startTime;
+ this.endTime = endTime;
+ }
+
+ @Override
+ public long getLength() throws IOException, InterruptedException {
+ return fileSize;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException, InterruptedException {
+ // TODO: Find the data node with the most blocks for this WAL?
+ return new String[] {};
+ }
+
+ public String getLogFileName() {
+ return logFileName;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public long getEndTime() {
+ return endTime;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ logFileName = in.readUTF();
+ fileSize = in.readLong();
+ startTime = in.readLong();
+ endTime = in.readLong();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(logFileName);
+ out.writeLong(fileSize);
+ out.writeLong(startTime);
+ out.writeLong(endTime);
+ }
+
+ @Override
+ public String toString() {
+ return logFileName + " (" + startTime + ":" + endTime + ") length:" + fileSize;
+ }
+ }
+
+ /**
+ * {@link RecordReader} for an {@link WAL} file.
+ * Implementation shared with deprecated HLogInputFormat.
+ */
+ static abstract class WALRecordReader extends RecordReader {
+ private Reader reader = null;
+ // visible until we can remove the deprecated HLogInputFormat
+ Entry currentEntry = new Entry();
+ private long startTime;
+ private long endTime;
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ WALSplit hsplit = (WALSplit)split;
+ Path logFile = new Path(hsplit.getLogFileName());
+ Configuration conf = context.getConfiguration();
+ LOG.info("Opening reader for "+split);
+ try {
+ this.reader = WALFactory.createReader(logFile.getFileSystem(conf), logFile, conf);
+ } catch (EOFException x) {
+ LOG.info("Ignoring corrupted WAL file: " + logFile
+ + " (This is normal when a RegionServer crashed.)");
+ this.reader = null;
+ }
+ this.startTime = hsplit.getStartTime();
+ this.endTime = hsplit.getEndTime();
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (reader == null) return false;
+
+ Entry temp;
+ long i = -1;
+ do {
+ // skip older entries
+ try {
+ temp = reader.next(currentEntry);
+ i++;
+ } catch (EOFException x) {
+ LOG.info("Corrupted entry detected. Ignoring the rest of the file."
+ + " (This is normal when a RegionServer crashed.)");
+ return false;
+ }
+ }
+ while(temp != null && temp.getKey().getWriteTime() < startTime);
+
+ if (temp == null) {
+ if (i > 0) LOG.info("Skipped " + i + " entries.");
+ LOG.info("Reached end of file.");
+ return false;
+ } else if (i > 0) {
+ LOG.info("Skipped " + i + " entries, until ts: " + temp.getKey().getWriteTime() + ".");
+ }
+ boolean res = temp.getKey().getWriteTime() <= endTime;
+ if (!res) {
+ LOG.info("Reached ts: " + temp.getKey().getWriteTime() + " ignoring the rest of the file.");
+ }
+ return res;
+ }
+
+ @Override
+ public WALEdit getCurrentValue() throws IOException, InterruptedException {
+ return currentEntry.getEdit();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ // N/A depends on total number of entries, which is unknown
+ return 0;
+ }
+
+ @Override
+ public void close() throws IOException {
+ LOG.info("Closing reader");
+ if (reader != null) this.reader.close();
+ }
+ }
+
+ /**
+ * handler for non-deprecated WALKey version. fold into WALRecordReader once we no longer
+ * need to support HLogInputFormat.
+ */
+ static class WALKeyRecordReader extends WALRecordReader {
+ @Override
+ public WALKey getCurrentKey() throws IOException, InterruptedException {
+ return currentEntry.getKey();
+ }
+ }
+
+ @Override
+ public List getSplits(JobContext context) throws IOException,
+ InterruptedException {
+ return getSplits(context, START_TIME_KEY, END_TIME_KEY);
+ }
+
+ /**
+ * implementation shared with deprecated HLogInputFormat
+ */
+ List getSplits(final JobContext context, final String startKey, final String endKey)
+ throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ Path inputDir = new Path(conf.get("mapreduce.input.fileinputformat.inputdir"));
+
+ long startTime = conf.getLong(startKey, Long.MIN_VALUE);
+ long endTime = conf.getLong(endKey, Long.MAX_VALUE);
+
+ FileSystem fs = inputDir.getFileSystem(conf);
+ List files = getFiles(fs, inputDir, startTime, endTime);
+
+ List splits = new ArrayList(files.size());
+ for (FileStatus file : files) {
+ splits.add(new WALSplit(file.getPath().toString(), file.getLen(), startTime, endTime));
+ }
+ return splits;
+ }
+
+ private List getFiles(FileSystem fs, Path dir, long startTime, long endTime)
+ throws IOException {
+ List result = new ArrayList();
+ LOG.debug("Scanning " + dir.toString() + " for WAL files");
+
+ FileStatus[] files = fs.listStatus(dir);
+ if (files == null) return Collections.emptyList();
+ for (FileStatus file : files) {
+ if (file.isDirectory()) {
+ // recurse into sub directories
+ result.addAll(getFiles(fs, file.getPath(), startTime, endTime));
+ } else {
+ String name = file.getPath().toString();
+ int idx = name.lastIndexOf('.');
+ if (idx > 0) {
+ try {
+ long fileStartTime = Long.parseLong(name.substring(idx+1));
+ if (fileStartTime <= endTime) {
+ LOG.info("Found: " + name);
+ result.add(file);
+ }
+ } catch (NumberFormatException x) {
+ idx = 0;
+ }
+ }
+ if (idx == 0) {
+ LOG.warn("File " + name + " does not appear to be an WAL file. Skipping...");
+ }
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public RecordReader createRecordReader(InputSplit split,
+ TaskAttemptContext context) throws IOException, InterruptedException {
+ return new WALKeyRecordReader();
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
index 415d14c..a487878 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
@@ -23,6 +23,8 @@ import java.text.SimpleDateFormat;
import java.util.Map;
import java.util.TreeMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@@ -39,7 +41,7 @@ import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
@@ -63,22 +65,32 @@ import org.apache.hadoop.util.ToolRunner;
@InterfaceAudience.Public
@InterfaceStability.Stable
public class WALPlayer extends Configured implements Tool {
+ final static Log LOG = LogFactory.getLog(WALPlayer.class);
final static String NAME = "WALPlayer";
- final static String BULK_OUTPUT_CONF_KEY = "hlog.bulk.output";
- final static String HLOG_INPUT_KEY = "hlog.input.dir";
- final static String TABLES_KEY = "hlog.input.tables";
- final static String TABLE_MAP_KEY = "hlog.input.tablesmap";
+ final static String BULK_OUTPUT_CONF_KEY = "wal.bulk.output";
+ final static String TABLES_KEY = "wal.input.tables";
+ final static String TABLE_MAP_KEY = "wal.input.tablesmap";
+
+ // This relies on Hadoop Configuration to handle warning about deprecated configs and
+ // to set the correct non-deprecated configs when an old one shows up.
+ static {
+ Configuration.addDeprecation("hlog.bulk.output", BULK_OUTPUT_CONF_KEY);
+ Configuration.addDeprecation("hlog.input.tables", TABLES_KEY);
+ Configuration.addDeprecation("hlog.input.tablesmap", TABLE_MAP_KEY);
+ Configuration.addDeprecation(HLogInputFormat.START_TIME_KEY, WALInputFormat.START_TIME_KEY);
+ Configuration.addDeprecation(HLogInputFormat.END_TIME_KEY, WALInputFormat.END_TIME_KEY);
+ }
/**
* A mapper that just writes out KeyValues.
* This one can be used together with {@link KeyValueSortReducer}
*/
- static class HLogKeyValueMapper
- extends Mapper {
+ static class WALKeyValueMapper
+ extends Mapper {
private byte[] table;
@Override
- public void map(HLogKey key, WALEdit value,
+ public void map(WALKey key, WALEdit value,
Context context)
throws IOException {
try {
@@ -100,7 +112,7 @@ public class WALPlayer extends Configured implements Tool {
// only a single table is supported when HFiles are generated with HFileOutputFormat
String tables[] = context.getConfiguration().getStrings(TABLES_KEY);
if (tables == null || tables.length != 1) {
- // this can only happen when HLogMapper is used directly by a class other than WALPlayer
+ // this can only happen when WALMapper is used directly by a class other than WALPlayer
throw new IOException("Exactly one table must be specified for bulk HFile case.");
}
table = Bytes.toBytes(tables[0]);
@@ -111,13 +123,13 @@ public class WALPlayer extends Configured implements Tool {
* A mapper that writes out {@link Mutation} to be directly applied to
* a running HBase instance.
*/
- static class HLogMapper
- extends Mapper {
+ static class WALMapper
+ extends Mapper {
private Map tables =
new TreeMap();
@Override
- public void map(HLogKey key, WALEdit value,
+ public void map(WALKey key, WALEdit value,
Context context)
throws IOException {
try {
@@ -130,7 +142,7 @@ public class WALPlayer extends Configured implements Tool {
Delete del = null;
Cell lastCell = null;
for (Cell cell : value.getCells()) {
- // filtering HLog meta entries
+ // filtering WAL meta entries
if (WALEdit.isMetaEditFamily(cell.getFamily())) continue;
// A WALEdit may contain multiple operations (HBASE-3584) and/or
@@ -170,7 +182,7 @@ public class WALPlayer extends Configured implements Tool {
String[] tableMap = context.getConfiguration().getStrings(TABLE_MAP_KEY);
String[] tablesToUse = context.getConfiguration().getStrings(TABLES_KEY);
if (tablesToUse == null || tableMap == null || tablesToUse.length != tableMap.length) {
- // this can only happen when HLogMapper is used directly by a class other than WALPlayer
+ // this can only happen when WALMapper is used directly by a class other than WALPlayer
throw new IOException("No tables or incorrect table mapping specified.");
}
int i = 0;
@@ -190,7 +202,7 @@ public class WALPlayer extends Configured implements Tool {
void setupTime(Configuration conf, String option) throws IOException {
String val = conf.get(option);
- if (val == null) return;
+ if (null == val) return;
long ms;
try {
// first try to parse in user friendly form
@@ -237,7 +249,7 @@ public class WALPlayer extends Configured implements Tool {
Job job = new Job(conf, NAME + "_" + inputDir);
job.setJarByClass(WALPlayer.class);
FileInputFormat.setInputPaths(job, inputDir);
- job.setInputFormatClass(HLogInputFormat.class);
+ job.setInputFormatClass(WALInputFormat.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
if (hfileOutPath != null) {
@@ -246,7 +258,7 @@ public class WALPlayer extends Configured implements Tool {
throw new IOException("Exactly one table must be specified for the bulk export option");
}
HTable table = new HTable(conf, TableName.valueOf(tables[0]));
- job.setMapperClass(HLogKeyValueMapper.class);
+ job.setMapperClass(WALKeyValueMapper.class);
job.setReducerClass(KeyValueSortReducer.class);
Path outputDir = new Path(hfileOutPath);
FileOutputFormat.setOutputPath(job, outputDir);
@@ -256,7 +268,7 @@ public class WALPlayer extends Configured implements Tool {
com.google.common.base.Preconditions.class);
} else {
// output to live cluster
- job.setMapperClass(HLogMapper.class);
+ job.setMapperClass(WALMapper.class);
job.setOutputFormatClass(MultiTableOutputFormat.class);
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.initCredentials(job);
@@ -286,8 +298,8 @@ public class WALPlayer extends Configured implements Tool {
System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
System.err.println(" (Only one table can be specified, and no mapping is allowed!)");
System.err.println("Other options: (specify time range to WAL edit to consider)");
- System.err.println(" -D" + HLogInputFormat.START_TIME_KEY + "=[date|ms]");
- System.err.println(" -D" + HLogInputFormat.END_TIME_KEY + "=[date|ms]");
+ System.err.println(" -D" + WALInputFormat.START_TIME_KEY + "=[date|ms]");
+ System.err.println(" -D" + WALInputFormat.END_TIME_KEY + "=[date|ms]");
System.err.println("For performance also consider the following options:\n"
+ " -Dmapreduce.map.speculative=false\n"
+ " -Dmapreduce.reduce.speculative=false");
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
index 6657a9e..9bc75e2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
@@ -90,9 +90,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.util.ConfigUtil;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.KeyLocker;
@@ -558,17 +557,20 @@ public class AssignmentManager extends ZooKeeperListener {
}
if (!failover) {
// If we get here, we have a full cluster restart. It is a failover only
- // if there are some HLogs are not split yet. For meta HLogs, they should have
+ // if there are some WALs are not split yet. For meta WALs, they should have
// been split already, if any. We can walk through those queued dead servers,
- // if they don't have any HLogs, this restart should be considered as a clean one
+ // if they don't have any WALs, this restart should be considered as a clean one
Set queuedDeadServers = serverManager.getRequeuedDeadServers().keySet();
if (!queuedDeadServers.isEmpty()) {
Configuration conf = server.getConfiguration();
Path rootdir = FSUtils.getRootDir(conf);
FileSystem fs = rootdir.getFileSystem(conf);
for (ServerName serverName: queuedDeadServers) {
- Path logDir = new Path(rootdir, HLogUtil.getHLogDirectoryName(serverName.toString()));
- Path splitDir = logDir.suffix(HLog.SPLITTING_EXT);
+ // In the case of a clean exit, the shutdown handler would have presplit any WALs and
+ // removed empty directories.
+ Path logDir = new Path(rootdir,
+ DefaultWALProvider.getWALDirectoryName(serverName.toString()));
+ Path splitDir = logDir.suffix(DefaultWALProvider.SPLITTING_EXT);
if (fs.exists(logDir) || fs.exists(splitDir)) {
LOG.debug("Found queued dead server " + serverName);
failover = true;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
index 3affa5a..2532876 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
@@ -50,8 +50,8 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
@@ -92,14 +92,14 @@ public class MasterFileSystem {
final static PathFilter META_FILTER = new PathFilter() {
@Override
public boolean accept(Path p) {
- return HLogUtil.isMetaFile(p);
+ return DefaultWALProvider.isMetaFile(p);
}
};
final static PathFilter NON_META_FILTER = new PathFilter() {
@Override
public boolean accept(Path p) {
- return !HLogUtil.isMetaFile(p);
+ return !DefaultWALProvider.isMetaFile(p);
}
};
@@ -214,7 +214,7 @@ public class MasterFileSystem {
*/
Set getFailedServersFromLogFolders() {
boolean retrySplitting = !conf.getBoolean("hbase.hlog.split.skip.errors",
- HLog.SPLIT_SKIP_ERRORS_DEFAULT);
+ WALSplitter.SPLIT_SKIP_ERRORS_DEFAULT);
Set serverNames = new HashSet();
Path logsDirPath = new Path(this.rootdir, HConstants.HREGION_LOGDIR_NAME);
@@ -237,13 +237,13 @@ public class MasterFileSystem {
return serverNames;
}
for (FileStatus status : logFolders) {
- String sn = status.getPath().getName();
- // truncate splitting suffix if present (for ServerName parsing)
- if (sn.endsWith(HLog.SPLITTING_EXT)) {
- sn = sn.substring(0, sn.length() - HLog.SPLITTING_EXT.length());
- }
- ServerName serverName = ServerName.parseServerName(sn);
- if (!onlineServers.contains(serverName)) {
+ final ServerName serverName = DefaultWALProvider.getServerNameFromWALDirectoryName(
+ status.getPath());
+ if (null == serverName) {
+ LOG.warn("Log folder " + status.getPath() + " doesn't look like its name includes a " +
+ "region server name; leaving in place. If you see later errors about missing " +
+ "write ahead logs they may be saved in this location.");
+ } else if (!onlineServers.contains(serverName)) {
LOG.info("Log folder " + status.getPath() + " doesn't belong "
+ "to a known region server, splitting");
serverNames.add(serverName);
@@ -281,7 +281,7 @@ public class MasterFileSystem {
}
/**
- * Specialized method to handle the splitting for meta HLog
+ * Specialized method to handle the splitting for meta WAL
* @param serverName
* @throws IOException
*/
@@ -292,7 +292,7 @@ public class MasterFileSystem {
}
/**
- * Specialized method to handle the splitting for meta HLog
+ * Specialized method to handle the splitting for meta WAL
* @param serverNames
* @throws IOException
*/
@@ -300,6 +300,9 @@ public class MasterFileSystem {
splitLog(serverNames, META_FILTER);
}
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="UL_UNRELEASED_LOCK", justification=
+ "We only release this lock when we set it. Updates to code that uses it should verify use " +
+ "of the guard boolean.")
private List getLogDirs(final Set serverNames) throws IOException {
List logDirs = new ArrayList();
boolean needReleaseLock = false;
@@ -310,9 +313,10 @@ public class MasterFileSystem {
}
try {
for (ServerName serverName : serverNames) {
- Path logDir = new Path(this.rootdir, HLogUtil.getHLogDirectoryName(serverName.toString()));
- Path splitDir = logDir.suffix(HLog.SPLITTING_EXT);
- // Rename the directory so a rogue RS doesn't create more HLogs
+ Path logDir = new Path(this.rootdir,
+ DefaultWALProvider.getWALDirectoryName(serverName.toString()));
+ Path splitDir = logDir.suffix(DefaultWALProvider.SPLITTING_EXT);
+ // Rename the directory so a rogue RS doesn't create more WALs
if (fs.exists(logDir)) {
if (!this.fs.rename(logDir, splitDir)) {
throw new IOException("Failed fs.rename for log split: " + logDir);
@@ -365,9 +369,10 @@ public class MasterFileSystem {
}
/**
- * This method is the base split method that splits HLog files matching a filter. Callers should
- * pass the appropriate filter for meta and non-meta HLogs.
- * @param serverNames
+ * This method is the base split method that splits WAL files matching a filter. Callers should
+ * pass the appropriate filter for meta and non-meta WALs.
+ * @param serverNames logs belonging to these servers will be split; this will rename the log
+ * directory out from under a soft-failed server
* @param filter
* @throws IOException
*/
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterFileSystem.java
index 34547ef..45dbeb8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterFileSystem.java
@@ -31,7 +31,7 @@ public class MetricsMasterFileSystem {
/**
* Record a single instance of a split
* @param time time that the split took
- * @param size length of original HLogs that were split
+ * @param size length of original WALs that were split
*/
public synchronized void addSplit(long time, long size) {
source.updateSplitTime(time);
@@ -41,7 +41,7 @@ public class MetricsMasterFileSystem {
/**
* Record a single instance of a split
* @param time time that the split took
- * @param size length of original HLogs that were split
+ * @param size length of original WALs that were split
*/
public synchronized void addMetaWALSplit(long time, long size) {
source.updateMetaWALSplitTime(time);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
index cfa924f..3c6c5a7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
@@ -476,7 +476,7 @@ public class RegionStates {
}
/**
- * A dead server's hlogs have been split so that all the regions
+ * A dead server's wals have been split so that all the regions
* used to be open on it can be safely assigned now. Mark them assignable.
*/
public synchronized void logSplit(final ServerName serverName) {
@@ -722,7 +722,7 @@ public class RegionStates {
/**
* Checking if a region was assigned to a server which is not online now.
- * If so, we should hold re-assign this region till SSH has split its hlogs.
+ * If so, we should hold re-assign this region till SSH has split its wals.
* Once logs are split, the last assignment of this region will be reset,
* which means a null last assignment server is ok for re-assigning.
*
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
index 627c68d..da4b827 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
@@ -589,7 +589,7 @@ public class ServerManager {
this.processDeadServer(serverName, false);
}
- public synchronized void processDeadServer(final ServerName serverName, boolean shouldSplitHlog) {
+ public synchronized void processDeadServer(final ServerName serverName, boolean shouldSplitWal) {
// When assignment manager is cleaning up the zookeeper nodes and rebuilding the
// in-memory region states, region servers could be down. Meta table can and
// should be re-assigned, log splitting can be done too. However, it is better to
@@ -599,14 +599,14 @@ public class ServerManager {
// the handler threads and meta table could not be re-assigned in case
// the corresponding server is down. So we queue them up here instead.
if (!services.getAssignmentManager().isFailoverCleanupDone()) {
- requeuedDeadServers.put(serverName, shouldSplitHlog);
+ requeuedDeadServers.put(serverName, shouldSplitWal);
return;
}
this.deadservers.add(serverName);
this.services.getExecutorService().submit(
new ServerShutdownHandler(this.master, this.services, this.deadservers, serverName,
- shouldSplitHlog));
+ shouldSplitWal));
}
/**
@@ -953,7 +953,7 @@ public class ServerManager {
/**
* During startup, if we figure it is not a failover, i.e. there is
- * no more HLog files to split, we won't try to recover these dead servers.
+ * no more WAL files to split, we won't try to recover these dead servers.
* So we just remove them from the queue. Use caution in calling this.
*/
void removeRequeuedDeadServers() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
index bd3dceb..23ef6a5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
@@ -57,7 +57,7 @@ import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination.SplitLog
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
@@ -101,8 +101,7 @@ public class SplitLogManager {
private Server server;
private final Stoppable stopper;
- private FileSystem fs;
- private Configuration conf;
+ private final Configuration conf;
public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); // 3 min
@@ -160,16 +159,34 @@ public class SplitLogManager {
}
private FileStatus[] getFileList(List logDirs, PathFilter filter) throws IOException {
+ return getFileList(conf, logDirs, filter);
+ }
+
+ /**
+ * Get a list of paths that need to be split given a set of server-specific directories and
+ * optinally a filter.
+ *
+ * See {@link DefaultWALProvider#getServerNameFromWALDirectoryName} for more info on directory
+ * layout.
+ *
+ * Should be package-private, but is needed by
+ * {@link org.apache.hadoop.hbase.wal.WALSplitter#split(Path, Path, Path, FileSystem,
+ * Configuration, WALFactory)} for tests.
+ */
+ @VisibleForTesting
+ public static FileStatus[] getFileList(final Configuration conf, final List logDirs,
+ final PathFilter filter)
+ throws IOException {
List fileStatus = new ArrayList();
- for (Path hLogDir : logDirs) {
- this.fs = hLogDir.getFileSystem(conf);
- if (!fs.exists(hLogDir)) {
- LOG.warn(hLogDir + " doesn't exist. Nothing to do!");
+ for (Path logDir : logDirs) {
+ final FileSystem fs = logDir.getFileSystem(conf);
+ if (!fs.exists(logDir)) {
+ LOG.warn(logDir + " doesn't exist. Nothing to do!");
continue;
}
- FileStatus[] logfiles = FSUtils.listStatus(fs, hLogDir, filter);
+ FileStatus[] logfiles = FSUtils.listStatus(fs, logDir, filter);
if (logfiles == null || logfiles.length == 0) {
- LOG.info(hLogDir + " is empty dir, no logs to split");
+ LOG.info(logDir + " is empty dir, no logs to split");
} else {
Collections.addAll(fileStatus, logfiles);
}
@@ -179,7 +196,7 @@ public class SplitLogManager {
}
/**
- * @param logDir one region sever hlog dir path in .logs
+ * @param logDir one region sever wal dir path in .logs
* @throws IOException if there was an error while splitting any log file
* @return cumulative size of the logfiles split
* @throws IOException
@@ -205,7 +222,7 @@ public class SplitLogManager {
Set serverNames = new HashSet();
for (Path logDir : logDirs) {
try {
- ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(logDir);
+ ServerName serverName = DefaultWALProvider.getServerNameFromWALDirectoryName(logDir);
if (serverName != null) {
serverNames.add(serverName);
}
@@ -272,6 +289,7 @@ public class SplitLogManager {
}
for (Path logDir : logDirs) {
status.setStatus("Cleaning up log directory...");
+ final FileSystem fs = logDir.getFileSystem(conf);
try {
if (fs.exists(logDir) && !fs.delete(logDir, false)) {
LOG.warn("Unable to delete log src dir. Ignoring. " + logDir);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
index 6c8e428..f68bfa2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
@@ -26,10 +26,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
/**
- * This Chore, every time it runs, will attempt to delete the HLogs in the old logs folder. The HLog
+ * This Chore, every time it runs, will attempt to delete the WALs in the old logs folder. The WAL
* is only deleted if none of the cleaner delegates says otherwise.
* @see BaseLogCleanerDelegate
*/
@@ -51,6 +51,6 @@ public class LogCleaner extends CleanerChore {
@Override
protected boolean validate(Path file) {
- return HLogUtil.validateHLogFilename(file.getName());
+ return DefaultWALProvider.validateWALFilename(file.getName());
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveLogCleaner.java
index 3a39fb4..9d68601 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveLogCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveLogCleaner.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
- * Log cleaner that uses the timestamp of the hlog to determine if it should
+ * Log cleaner that uses the timestamp of the wal to determine if it should
* be deleted. By default they are allowed to live for 10 minutes.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java
index be6b391..0e72496 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java
@@ -67,7 +67,7 @@ public class MetaServerShutdownHandler extends ServerShutdownHandler {
boolean distributedLogReplay =
(this.services.getMasterFileSystem().getLogRecoveryMode() == RecoveryMode.LOG_REPLAY);
try {
- if (this.shouldSplitHlog) {
+ if (this.shouldSplitWal) {
LOG.info("Splitting hbase:meta logs for " + serverName);
if (distributedLogReplay) {
Set regions = new HashSet();
@@ -101,7 +101,7 @@ public class MetaServerShutdownHandler extends ServerShutdownHandler {
}
try {
- if (this.shouldSplitHlog && distributedLogReplay) {
+ if (this.shouldSplitWal && distributedLogReplay) {
if (!am.waitOnRegionToClearRegionsInTransition(HRegionInfo.FIRST_META_REGIONINFO,
regionAssignmentWaitTimeout)) {
// Wait here is to avoid log replay hits current dead server and incur a RPC timeout
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
index d506fe5..907d5ca 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
@@ -61,19 +61,19 @@ public class ServerShutdownHandler extends EventHandler {
protected final ServerName serverName;
protected final MasterServices services;
protected final DeadServer deadServers;
- protected final boolean shouldSplitHlog; // whether to split HLog or not
+ protected final boolean shouldSplitWal; // whether to split WAL or not
protected final int regionAssignmentWaitTimeout;
public ServerShutdownHandler(final Server server, final MasterServices services,
final DeadServer deadServers, final ServerName serverName,
- final boolean shouldSplitHlog) {
+ final boolean shouldSplitWal) {
this(server, services, deadServers, serverName, EventType.M_SERVER_SHUTDOWN,
- shouldSplitHlog);
+ shouldSplitWal);
}
ServerShutdownHandler(final Server server, final MasterServices services,
final DeadServer deadServers, final ServerName serverName, EventType type,
- final boolean shouldSplitHlog) {
+ final boolean shouldSplitWal) {
super(server, type);
this.serverName = serverName;
this.server = server;
@@ -82,7 +82,7 @@ public class ServerShutdownHandler extends EventHandler {
if (!this.deadServers.isDeadServer(this.serverName)) {
LOG.warn(this.serverName + " is NOT in deadservers; it should be!");
}
- this.shouldSplitHlog = shouldSplitHlog;
+ this.shouldSplitWal = shouldSplitWal;
this.regionAssignmentWaitTimeout = server.getConfiguration().getInt(
HConstants.LOG_REPLAY_WAIT_REGION_TIMEOUT, 15000);
}
@@ -138,7 +138,7 @@ public class ServerShutdownHandler extends EventHandler {
AssignmentManager am = services.getAssignmentManager();
ServerManager serverManager = services.getServerManager();
if (isCarryingMeta() /* hbase:meta */ || !am.isFailoverCleanupDone()) {
- serverManager.processDeadServer(serverName, this.shouldSplitHlog);
+ serverManager.processDeadServer(serverName, this.shouldSplitWal);
return;
}
@@ -200,7 +200,7 @@ public class ServerShutdownHandler extends EventHandler {
(this.services.getMasterFileSystem().getLogRecoveryMode() == RecoveryMode.LOG_REPLAY);
try {
- if (this.shouldSplitHlog) {
+ if (this.shouldSplitWal) {
if (distributedLogReplay) {
LOG.info("Mark regions in recovery for crashed server " + serverName +
" before assignment; regions=" + hris);
@@ -302,13 +302,13 @@ public class ServerShutdownHandler extends EventHandler {
throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
} catch (IOException ioe) {
LOG.info("Caught " + ioe + " during region assignment, will retry");
- // Only do HLog splitting if shouldSplitHlog and in DLR mode
+ // Only do wal splitting if shouldSplitWal and in DLR mode
serverManager.processDeadServer(serverName,
- this.shouldSplitHlog && distributedLogReplay);
+ this.shouldSplitWal && distributedLogReplay);
return;
}
- if (this.shouldSplitHlog && distributedLogReplay) {
+ if (this.shouldSplitWal && distributedLogReplay) {
// wait for region assignment completes
for (HRegionInfo hri : toAssignRegions) {
try {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotLogCleaner.java
index d5e174d..a927db3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotLogCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotLogCleaner.java
@@ -46,11 +46,11 @@ public class SnapshotLogCleaner extends BaseLogCleanerDelegate {
* Conf key for the frequency to attempt to refresh the cache of hfiles currently used in
* snapshots (ms)
*/
- static final String HLOG_CACHE_REFRESH_PERIOD_CONF_KEY =
+ static final String WAL_CACHE_REFRESH_PERIOD_CONF_KEY =
"hbase.master.hlogcleaner.plugins.snapshot.period";
/** Refresh cache, by default, every 5 minutes */
- private static final long DEFAULT_HLOG_CACHE_REFRESH_PERIOD = 300000;
+ private static final long DEFAULT_WAL_CACHE_REFRESH_PERIOD = 300000;
private SnapshotFileCache cache;
@@ -77,14 +77,14 @@ public class SnapshotLogCleaner extends BaseLogCleanerDelegate {
super.setConf(conf);
try {
long cacheRefreshPeriod = conf.getLong(
- HLOG_CACHE_REFRESH_PERIOD_CONF_KEY, DEFAULT_HLOG_CACHE_REFRESH_PERIOD);
+ WAL_CACHE_REFRESH_PERIOD_CONF_KEY, DEFAULT_WAL_CACHE_REFRESH_PERIOD);
final FileSystem fs = FSUtils.getCurrentFileSystem(conf);
Path rootDir = FSUtils.getRootDir(conf);
cache = new SnapshotFileCache(fs, rootDir, cacheRefreshPeriod, cacheRefreshPeriod,
"snapshot-log-cleaner-cache-refresher", new SnapshotFileCache.SnapshotFileInspector() {
public Collection filesUnderSnapshot(final Path snapshotDir)
throws IOException {
- return SnapshotReferenceUtil.getHLogNames(fs, snapshotDir);
+ return SnapshotReferenceUtil.getWALNames(fs, snapshotDir);
}
});
} catch (IOException e) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/migration/NamespaceUpgrade.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/migration/NamespaceUpgrade.java
index e34420d..1379378 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/migration/NamespaceUpgrade.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/migration/NamespaceUpgrade.java
@@ -48,9 +48,8 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.security.access.AccessControlLists;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.util.Bytes;
@@ -380,12 +379,11 @@ public class NamespaceUpgrade implements Tool {
ServerName fakeServer = ServerName.valueOf("nsupgrade", 96, 123);
- String metaLogName = HLogUtil.getHLogDirectoryName(fakeServer.toString());
- HLog metaHLog = HLogFactory.createMetaHLog(fs, rootDir,
- metaLogName, conf, null,
- fakeServer.toString());
+ final WALFactory walFactory = new WALFactory(conf, null, fakeServer.toString());
+ WAL metawal = walFactory.getMetaWAL(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes());
+ FSTableDescriptors fst = new FSTableDescriptors(conf);
HRegion meta = HRegion.openHRegion(rootDir, HRegionInfo.FIRST_META_REGIONINFO,
- HTableDescriptor.META_TABLEDESC, metaHLog, conf);
+ fst.get(TableName.META_TABLE_NAME), metawal, conf);
HRegion region = null;
try {
for(Path regionDir : FSUtils.getRegionDirs(fs, oldTablePath)) {
@@ -402,7 +400,7 @@ public class NamespaceUpgrade implements Tool {
new HRegion(
HRegionFileSystem.openRegionFromFileSystem(conf, fs, oldTablePath,
oldRegionInfo, false),
- metaHLog,
+ metawal,
conf,
oldDesc,
null);
@@ -439,7 +437,7 @@ public class NamespaceUpgrade implements Tool {
meta.flushcache();
meta.waitForFlushesAndCompactions();
meta.close();
- metaHLog.closeAndDelete();
+ metawal.close();
if(region != null) {
region.close();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/migration/UpgradeTo96.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/migration/UpgradeTo96.java
index 6df2eab..fc11823 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/migration/UpgradeTo96.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/migration/UpgradeTo96.java
@@ -36,7 +36,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileV1Detector;
@@ -234,6 +235,8 @@ public class UpgradeTo96 extends Configured implements Tool {
LOG.info("Starting Log splitting");
final Path rootDir = FSUtils.getRootDir(getConf());
final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+ // since this is the singleton, we needn't close it.
+ final WALFactory factory = WALFactory.getInstance(getConf());
FileSystem fs = FSUtils.getCurrentFileSystem(getConf());
Path logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
FileStatus[] regionServerLogDirs = FSUtils.listStatus(fs, logDir);
@@ -244,7 +247,7 @@ public class UpgradeTo96 extends Configured implements Tool {
try {
for (FileStatus regionServerLogDir : regionServerLogDirs) {
// split its log dir, if exists
- HLogSplitter.split(rootDir, regionServerLogDir.getPath(), oldLogDir, fs, getConf());
+ WALSplitter.split(rootDir, regionServerLogDir.getPath(), oldLogDir, fs, getConf(), factory);
}
LOG.info("Successfully completed Log splitting");
} catch (Exception e) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
index 8acce16..d68d247 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
@@ -39,8 +39,8 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Pair;
@@ -50,14 +50,14 @@ import com.google.protobuf.ServiceException;
@InterfaceAudience.Private
public class ReplicationProtbufUtil {
/**
- * A helper to replicate a list of HLog entries using admin protocol.
+ * A helper to replicate a list of WAL entries using admin protocol.
*
* @param admin
* @param entries
* @throws java.io.IOException
*/
public static void replicateWALEntry(final AdminService.BlockingInterface admin,
- final HLog.Entry[] entries) throws IOException {
+ final Entry[] entries) throws IOException {
Pair p =
buildReplicateWALEntryRequest(entries);
PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond());
@@ -69,14 +69,14 @@ public class ReplicationProtbufUtil {
}
/**
- * Create a new ReplicateWALEntryRequest from a list of HLog entries
+ * Create a new ReplicateWALEntryRequest from a list of WAL entries
*
- * @param entries the HLog entries to be replicated
+ * @param entries the WAL entries to be replicated
* @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values
* found.
*/
public static Pair
- buildReplicateWALEntryRequest(final HLog.Entry[] entries) {
+ buildReplicateWALEntryRequest(final Entry[] entries) {
// Accumulate all the Cells seen in here.
List> allCells = new ArrayList>(entries.length);
int size = 0;
@@ -85,11 +85,11 @@ public class ReplicationProtbufUtil {
AdminProtos.ReplicateWALEntryRequest.Builder builder =
AdminProtos.ReplicateWALEntryRequest.newBuilder();
HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder();
- for (HLog.Entry entry: entries) {
+ for (Entry entry: entries) {
entryBuilder.clear();
- // TODO: this duplicates a lot in HLogKey#getBuilder
+ // TODO: this duplicates a lot in WALKey#getBuilder
WALProtos.WALKey.Builder keyBuilder = entryBuilder.getKeyBuilder();
- HLogKey key = entry.getKey();
+ WALKey key = entry.getKey();
keyBuilder.setEncodedRegionName(
ByteStringer.wrap(key.getEncodedRegionName()));
keyBuilder.setTableName(ByteStringer.wrap(key.getTablename().getName()));
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 288825f..9874683 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -59,6 +59,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -127,12 +128,15 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.Flus
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
-import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
-import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.MutationReplay;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
+import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
@@ -226,13 +230,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
protected volatile long lastFlushSeqId = -1L;
/**
- * Region scoped edit sequence Id. Edits to this region are GUARANTEED to appear in the WAL/HLog
+ * Region scoped edit sequence Id. Edits to this region are GUARANTEED to appear in the WAL
* file in this sequence id's order; i.e. edit #2 will be in the WAL after edit #1.
* Its default value is -1L. This default is used as a marker to indicate
* that the region hasn't opened yet. Once it is opened, it is set to the derived
* {@link #openSeqNum}, the largest sequence id of all hfiles opened under this Region.
*
- *
Control of this sequence is handed off to the WAL/HLog implementation. It is responsible
+ *
Control of this sequence is handed off to the WAL implementation. It is responsible
* for tagging edits with the correct sequence id since it is responsible for getting the
* edits into the WAL files. It controls updating the sequence id value. DO NOT UPDATE IT
* OUTSIDE OF THE WAL. The value you get will not be what you think it is.
@@ -298,7 +302,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
final AtomicLong compactionNumBytesCompacted = new AtomicLong(0L);
- private final HLog log;
+ private final WAL wal;
private final HRegionFileSystem fs;
protected final Configuration conf;
private final Configuration baseConf;
@@ -346,7 +350,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
//
// Context: During replay we want to ensure that we do not lose any data. So, we
- // have to be conservative in how we replay logs. For each store, we calculate
+ // have to be conservative in how we replay wals. For each store, we calculate
// the maxSeqId up to which the store was flushed. And, skip the edits which
// are equal to or lower than maxSeqId for each store.
// The following map is populated when opening the region
@@ -546,11 +550,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
*
* @param tableDir qualified path of directory where region should be located,
* usually the table directory.
- * @param log The HLog is the outbound log for any updates to the HRegion
- * (There's a single HLog for all the HRegions on a single HRegionServer.)
- * The log file is a logfile from the previous execution that's
+ * @param wal The WAL is the outbound log for any updates to the HRegion
+ * The wal file is a logfile from the previous execution that's
* custom-computed for this HRegion. The HRegionServer computes and sorts the
- * appropriate log info for this HRegion. If there is a previous log file
+ * appropriate wal info for this HRegion. If there is a previous wal file
* (implying that the HRegion has been written-to before), then read it from
* the supplied path.
* @param fs is the filesystem.
@@ -561,11 +564,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @param rsServices reference to {@link RegionServerServices} or null
*/
@Deprecated
- public HRegion(final Path tableDir, final HLog log, final FileSystem fs,
+ public HRegion(final Path tableDir, final WAL wal, final FileSystem fs,
final Configuration confParam, final HRegionInfo regionInfo,
final HTableDescriptor htd, final RegionServerServices rsServices) {
this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo),
- log, confParam, htd, rsServices);
+ wal, confParam, htd, rsServices);
}
/**
@@ -574,18 +577,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* {@link HRegion#createHRegion} or {@link HRegion#openHRegion} method.
*
* @param fs is the filesystem.
- * @param log The HLog is the outbound log for any updates to the HRegion
- * (There's a single HLog for all the HRegions on a single HRegionServer.)
- * The log file is a logfile from the previous execution that's
+ * @param wal The WAL is the outbound log for any updates to the HRegion
+ * The wal file is a logfile from the previous execution that's
* custom-computed for this HRegion. The HRegionServer computes and sorts the
- * appropriate log info for this HRegion. If there is a previous log file
+ * appropriate wal info for this HRegion. If there is a previous wal file
* (implying that the HRegion has been written-to before), then read it from
* the supplied path.
* @param confParam is global configuration settings.
* @param htd the table descriptor
* @param rsServices reference to {@link RegionServerServices} or null
*/
- public HRegion(final HRegionFileSystem fs, final HLog log, final Configuration confParam,
+ public HRegion(final HRegionFileSystem fs, final WAL wal, final Configuration confParam,
final HTableDescriptor htd, final RegionServerServices rsServices) {
if (htd == null) {
throw new IllegalArgumentException("Need table descriptor");
@@ -596,7 +598,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
this.comparator = fs.getRegionInfo().getComparator();
- this.log = log;
+ this.wal = wal;
this.fs = fs;
// 'conf' renamed to 'confParam' b/c we use this.conf in the constructor
@@ -772,14 +774,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
this.splitPolicy = RegionSplitPolicy.create(this, conf);
this.lastFlushTime = EnvironmentEdgeManager.currentTime();
- // Use maximum of log sequenceid or that which was found in stores
+ // Use maximum of wal sequenceid or that which was found in stores
// (particularly if no recovered edits, seqid will be -1).
long nextSeqid = maxSeqId + 1;
if (this.isRecovering) {
// In distributedLogReplay mode, we don't know the last change sequence number because region
// is opened before recovery completes. So we add a safety bumper to avoid new sequence number
// overlaps used sequence numbers
- nextSeqid = HLogUtil.writeRegionOpenSequenceIdFile(this.fs.getFileSystem(),
+ nextSeqid = WALSplitter.writeRegionOpenSequenceIdFile(this.fs.getFileSystem(),
this.fs.getRegionDir(), nextSeqid, (this.flushPerChanges + 10000000));
}
@@ -872,7 +874,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
return maxSeqId;
}
- private void writeRegionOpenMarker(HLog log, long openSeqId) throws IOException {
+ private void writeRegionOpenMarker(WAL wal, long openSeqId) throws IOException {
Map> storeFiles
= new TreeMap>(Bytes.BYTES_COMPARATOR);
for (Map.Entry entry : getStores().entrySet()) {
@@ -887,11 +889,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
RegionEventDescriptor.EventType.REGION_OPEN, getRegionInfo(), openSeqId,
getRegionServerServices().getServerName(), storeFiles);
- HLogUtil.writeRegionEventMarker(log, getTableDesc(), getRegionInfo(), regionOpenDesc,
+ WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionOpenDesc,
getSequenceId());
}
- private void writeRegionCloseMarker(HLog log) throws IOException {
+ private void writeRegionCloseMarker(WAL wal) throws IOException {
Map> storeFiles
= new TreeMap>(Bytes.BYTES_COMPARATOR);
for (Map.Entry entry : getStores().entrySet()) {
@@ -906,7 +908,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
RegionEventDescriptor regionEventDesc = ProtobufUtil.toRegionEventDescriptor(
RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), getSequenceId().get(),
getRegionServerServices().getServerName(), storeFiles);
- HLogUtil.writeRegionEventMarker(log, getTableDesc(), getRegionInfo(), regionEventDesc,
+ WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionEventDesc,
getSequenceId());
}
@@ -1043,7 +1045,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
boolean wasRecovering = this.isRecovering;
this.isRecovering = newState;
if (wasRecovering && !isRecovering) {
- // Call only when log replay is over.
+ // Call only when wal replay is over.
coprocessorHost.postLogReplay();
}
}
@@ -1289,8 +1291,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
status.setStatus("Writing region close event to WAL");
- if (!abort && log != null && getRegionServerServices() != null) {
- writeRegionCloseMarker(log);
+ if (!abort && wal != null && getRegionServerServices() != null) {
+ writeRegionCloseMarker(wal);
}
this.closed.set(true);
@@ -1415,9 +1417,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
return this.htableDescriptor;
}
- /** @return HLog in use for this region */
- public HLog getLog() {
- return this.log;
+ /** @return WAL in use for this region */
+ public WAL getWAL() {
+ return this.wal;
}
/**
@@ -1623,7 +1625,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @return true if the region needs compacting
*
* @throws IOException general io exceptions
- * @throws DroppedSnapshotException Thrown when replay of hlog is required
+ * @throws DroppedSnapshotException Thrown when replay of wal is required
* because a Snapshot was not properly persisted.
*/
public FlushResult flushcache() throws IOException {
@@ -1721,7 +1723,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
/**
* Flush the memstore. Flushing the memstore is a little tricky. We have a lot of updates in the
- * memstore, all of which have also been written to the log. We need to write those updates in the
+ * memstore, all of which have also been written to the wal. We need to write those updates in the
* memstore out to disk, while being able to process reads/writes as much as possible during the
* flush operation.
*
This method may block for some time. Every time you call it, we up the regions
@@ -1732,24 +1734,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @return object describing the flush's state
*
* @throws IOException general io exceptions
- * @throws DroppedSnapshotException Thrown when replay of hlog is required
+ * @throws DroppedSnapshotException Thrown when replay of wal is required
* because a Snapshot was not properly persisted.
*/
protected FlushResult internalFlushcache(MonitoredTask status)
throws IOException {
- return internalFlushcache(this.log, -1, status);
+ return internalFlushcache(this.wal, -1, status);
}
/**
- * @param wal Null if we're NOT to go via hlog/wal.
+ * @param wal Null if we're NOT to go via wal.
* @param myseqid The seqid to use if wal is null writing out flush file.
* @return object describing the flush's state
* @throws IOException
* @see #internalFlushcache(MonitoredTask)
*/
protected FlushResult internalFlushcache(
- final HLog wal, final long myseqid, MonitoredTask status)
- throws IOException {
+ final WAL wal, final long myseqid, MonitoredTask status) throws IOException {
if (this.rsServices != null && this.rsServices.isAborted()) {
// Don't flush when server aborting, it's unsafe
throw new IOException("Aborting flush because server is aborted...");
@@ -1763,14 +1764,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
this.updatesLock.writeLock().lock();
try {
if (this.memstoreSize.get() <= 0) {
- // Presume that if there are still no edits in the memstore, then
- // there are no edits for
- // this region out in the WAL/HLog subsystem so no need to do any
- // trickery clearing out
- // edits in the WAL system. Up the sequence number so the resulting
- // flush id is for
- // sure just beyond the last appended region edit (useful as a marker
- // when bulk loading,
+ // Presume that if there are still no edits in the memstore, then there are no edits for
+ // this region out in the WAL subsystem so no need to do any trickery clearing out
+ // edits in the WAL system. Up the sequence number so the resulting flush id is for
+ // sure just beyond the last appended region edit (useful as a marker when bulk loading,
// etc.)
// wal can be null replaying edits.
if (wal != null) {
@@ -1850,7 +1847,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
if (wal != null) {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH,
getRegionInfo(), flushSeqId, committedFiles);
- trxId = HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
+ trxId = WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
desc, sequenceId, false); // no sync. Sync is below where we do not hold the updates lock
}
@@ -1864,7 +1861,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
try {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
getRegionInfo(), flushSeqId, committedFiles);
- HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
+ WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
desc, sequenceId, false);
} catch (Throwable t) {
LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
@@ -1889,12 +1886,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
try {
wal.sync(); // ensure that flush marker is sync'ed
} catch (IOException ioe) {
- LOG.warn("Unexpected exception while log.sync(), ignoring. Exception: "
+ LOG.warn("Unexpected exception while wal.sync(), ignoring. Exception: "
+ StringUtils.stringifyException(ioe));
}
}
- // wait for all in-progress transactions to commit to HLog before
+ // wait for all in-progress transactions to commit to WAL before
// we can start the flush. This prevents
// uncommitted transactions from being written into HFiles.
// We have to block before we start the flush, otherwise keys that
@@ -1914,8 +1911,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
// Any failure from here on out will be catastrophic requiring server
- // restart so hlog content can be replayed and put back into the memstore.
- // Otherwise, the snapshot content while backed up in the hlog, it will not
+ // restart so wal content can be replayed and put back into the memstore.
+ // Otherwise, the snapshot content while backed up in the wal, it will not
// be part of the current running servers state.
boolean compactionRequested = false;
try {
@@ -1948,12 +1945,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// write flush marker to WAL. If fail, we should throw DroppedSnapshotException
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
getRegionInfo(), flushSeqId, committedFiles);
- HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
+ WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
desc, sequenceId, true);
}
} catch (Throwable t) {
// An exception here means that the snapshot was not persisted.
- // The hlog needs to be replayed so its content is restored to memstore.
+ // The wal needs to be replayed so its content is restored to memstore.
// Currently, only a server restart will do this.
// We used to only catch IOEs but its possible that we'd get other
// exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch
@@ -1962,7 +1959,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
try {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
getRegionInfo(), flushSeqId, committedFiles);
- HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
+ WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
desc, sequenceId, false);
} catch (Throwable ex) {
LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
@@ -2016,8 +2013,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @return Next sequence number unassociated with any actual edit.
* @throws IOException
*/
- private long getNextSequenceId(final HLog wal) throws IOException {
- HLogKey key = this.appendNoSyncNoAppend(wal, null);
+ private long getNextSequenceId(final WAL wal) throws IOException {
+ WALKey key = this.appendEmptyEdit(wal, null);
return key.getSequenceId();
}
@@ -2349,7 +2346,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
}
- private static class ReplayBatch extends BatchOperationInProgress {
+ private static class ReplayBatch extends BatchOperationInProgress {
private long replaySeqId = 0;
public ReplayBatch(MutationReplay[] operations, long seqId) {
super(operations);
@@ -2417,7 +2414,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* OperationStatusCode and the exceptionMessage if any.
* @throws IOException
*/
- public OperationStatus[] batchReplay(HLogSplitter.MutationReplay[] mutations, long replaySeqId)
+ public OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId)
throws IOException {
return batchMutate(new ReplayBatch(mutations, replaySeqId));
}
@@ -2532,7 +2529,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
int lastIndexExclusive = firstIndex;
boolean success = false;
int noOfPuts = 0, noOfDeletes = 0;
- HLogKey walKey = null;
+ WALKey walKey = null;
long mvccNum = 0;
try {
// ------------------------------------
@@ -2675,7 +2672,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// ------------------------------------
// STEP 3. Write back to memstore
// Write to memstore. It is ok to write to memstore
- // first without updating the HLog because we do not roll
+ // first without updating the WAL because we do not roll
// forward the memstore MVCC. The MVCC will be moved up when
// the complete operation is done. These changes are not yet
// visible to scanners till we update the MVCC. The MVCC is
@@ -2724,10 +2721,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
throw new IOException("Multiple nonces per batch and not in replay");
}
// txid should always increase, so having the one from the last call is ok.
+ // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), now, m.getClusterIds(),
currentNonceGroup, currentNonce);
- txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), walKey,
+ txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey,
walEdit, getSequenceId(), true, null);
walEdit = new WALEdit(isInReplay);
walKey = null;
@@ -2751,18 +2749,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// -------------------------
Mutation mutation = batchOp.getMutation(firstIndex);
if (walEdit.size() > 0) {
+ // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
- this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now,
+ this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
mutation.getClusterIds(), currentNonceGroup, currentNonce);
if(isInReplay) {
walKey.setOrigLogSeqNum(batchOp.getReplaySequenceId());
}
- txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit,
+ txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit,
getSequenceId(), true, memstoreCells);
}
if(walKey == null){
// Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
- walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
+ walKey = this.appendEmptyEdit(this.wal, memstoreCells);
}
// -------------------------------
@@ -3165,7 +3164,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
/**
- * Add updates first to the hlog and then add values to memstore.
+ * Add updates first to the wal and then add values to memstore.
* Warning: Assumption is caller has lock on passed in row.
* @param edits Cell updates by column
* @throws IOException
@@ -3297,7 +3296,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
/**
* Append the given map of family->edits to a WALEdit data structure.
- * This does not write to the HLog itself.
+ * This does not write to the WAL itself.
* @param familyMap map of family->edits
* @param walEdit the destination entry to append into
*/
@@ -3339,11 +3338,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
/**
- * Read the edits log put under this region by wal log splitting process. Put
+ * Read the edits put under this region by wal splitting process. Put
* the recovered edits back up into this region.
*
- *
We can ignore any log message that has a sequence ID that's equal to or
- * lower than minSeqId. (Because we know such log messages are already
+ *
We can ignore any wal message that has a sequence ID that's equal to or
+ * lower than minSeqId. (Because we know such messages are already
* reflected in the HFiles.)
*
*
While this is running we are putting pressure on memory yet we are
@@ -3352,15 +3351,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* that if we're up against global memory limits, we'll not be flagged to flush
* because we are not online. We can't be flushed by usual mechanisms anyways;
* we're not yet online so our relative sequenceids are not yet aligned with
- * HLog sequenceids -- not till we come up online, post processing of split
+ * WAL sequenceids -- not till we come up online, post processing of split
* edits.
*
*
But to help relieve memory pressure, at least manage our own heap size
* flushing if are in excess of per-region limits. Flushing, though, we have
- * to be careful and avoid using the regionserver/hlog sequenceid. Its running
+ * to be careful and avoid using the regionserver/wal sequenceid. Its running
* on a different line to whats going on in here in this region context so if we
* crashed replaying these edits, but in the midst had a flush that used the
- * regionserver log with a sequenceid in excess of whats going on in here
+ * regionserver wal with a sequenceid in excess of whats going on in here
* in this region and with its split editlogs, then we could miss edits the
* next time we go to recover. So, we have to flush inline, using seqids that
* make sense in a this single region context only -- until we online.
@@ -3385,7 +3384,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
long seqid = minSeqIdForTheRegion;
FileSystem fs = this.fs.getFileSystem();
- NavigableSet files = HLogUtil.getSplitEditFilesSorted(fs, regiondir);
+ NavigableSet files = WALSplitter.getSplitEditFilesSorted(fs, regiondir);
if (LOG.isDebugEnabled()) {
LOG.debug("Found " + (files == null ? 0 : files.size())
+ " recovered edits file(s) under " + regiondir);
@@ -3405,7 +3404,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
maxSeqId = Math.abs(Long.parseLong(fileName));
if (maxSeqId <= minSeqIdForTheRegion) {
if (LOG.isDebugEnabled()) {
- String msg = "Maximum sequenceid for this log is " + maxSeqId
+ String msg = "Maximum sequenceid for this wal is " + maxSeqId
+ " and minimum sequenceid for the region is " + minSeqIdForTheRegion
+ ", skipped the whole file, path=" + edits;
LOG.debug(msg);
@@ -3429,7 +3428,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead.");
}
if (skipErrors) {
- Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
+ Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
+ "=true so continuing. Renamed " + edits +
" as " + p, e);
@@ -3460,7 +3459,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
/*
* @param edits File of recovered edits.
- * @param maxSeqIdInStores Maximum sequenceid found in each store. Edits in log
+ * @param maxSeqIdInStores Maximum sequenceid found in each store. Edits in wal
* must be larger than this to be replayed for each store.
* @param reporter
* @return the sequence id of the last edit added to this region out of the
@@ -3475,17 +3474,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
MonitoredTask status = TaskMonitor.get().createStatus(msg);
FileSystem fs = this.fs.getFileSystem();
- status.setStatus("Opening logs");
- HLog.Reader reader = null;
+ status.setStatus("Opening recovered edits");
+ WAL.Reader reader = null;
try {
- reader = HLogFactory.createReader(fs, edits, conf);
+ reader = WALFactory.createReader(fs, edits, conf);
long currentEditSeqId = -1;
long currentReplaySeqId = -1;
long firstSeqIdInLog = -1;
long skippedEdits = 0;
long editsCount = 0;
long intervalEdits = 0;
- HLog.Entry entry;
+ WAL.Entry entry;
Store store = null;
boolean reported_once = false;
ServerNonceManager ng = this.rsServices == null ? null : this.rsServices.getNonceManager();
@@ -3499,7 +3498,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
long lastReport = EnvironmentEdgeManager.currentTime();
while ((entry = reader.next()) != null) {
- HLogKey key = entry.getKey();
+ WALKey key = entry.getKey();
WALEdit val = entry.getEdit();
if (ng != null) { // some test, or nonces disabled
@@ -3541,7 +3540,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
if (coprocessorHost != null) {
status.setStatus("Running pre-WAL-restore hook in coprocessors");
if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) {
- // if bypass this log entry, ignore it ...
+ // if bypass this wal entry, ignore it ...
continue;
}
}
@@ -3600,9 +3599,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
}
} catch (EOFException eof) {
- Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
+ Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
msg = "Encountered EOF. Most likely due to Master failure during " +
- "log splitting, so we have this data in another edit. " +
+ "wal splitting, so we have this data in another edit. " +
"Continuing, but renaming " + edits + " as " + p;
LOG.warn(msg, eof);
status.abort(msg);
@@ -3610,7 +3609,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// If the IOE resulted from bad file format,
// then this problem is idempotent and retrying won't help
if (ioe.getCause() instanceof ParseException) {
- Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
+ Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
msg = "File corruption encountered! " +
"Continuing, but renaming " + edits + " as " + p;
LOG.warn(msg, ioe);
@@ -4429,11 +4428,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* {@link HConstants#REGION_IMPL} configuration property.
* @param tableDir qualified path of directory where region should be located,
* usually the table directory.
- * @param log The HLog is the outbound log for any updates to the HRegion
- * (There's a single HLog for all the HRegions on a single HRegionServer.)
- * The log file is a logfile from the previous execution that's
+ * @param wal The WAL is the outbound log for any updates to the HRegion
+ * The wal file is a logfile from the previous execution that's
* custom-computed for this HRegion. The HRegionServer computes and sorts the
- * appropriate log info for this HRegion. If there is a previous log file
+ * appropriate wal info for this HRegion. If there is a previous file
* (implying that the HRegion has been written-to before), then read it from
* the supplied path.
* @param fs is the filesystem.
@@ -4443,7 +4441,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @param htd the table descriptor
* @return the new instance
*/
- static HRegion newHRegion(Path tableDir, HLog log, FileSystem fs,
+ static HRegion newHRegion(Path tableDir, WAL wal, FileSystem fs,
Configuration conf, HRegionInfo regionInfo, final HTableDescriptor htd,
RegionServerServices rsServices) {
try {
@@ -4452,11 +4450,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
(Class extends HRegion>) conf.getClass(HConstants.REGION_IMPL, HRegion.class);
Constructor extends HRegion> c =
- regionClass.getConstructor(Path.class, HLog.class, FileSystem.class,
+ regionClass.getConstructor(Path.class, WAL.class, FileSystem.class,
Configuration.class, HRegionInfo.class, HTableDescriptor.class,
RegionServerServices.class);
- return c.newInstance(tableDir, log, fs, conf, regionInfo, htd, rsServices);
+ return c.newInstance(tableDir, wal, fs, conf, regionInfo, htd, rsServices);
} catch (Throwable e) {
// todo: what should I throw here?
throw new IllegalStateException("Could not instantiate a region instance.", e);
@@ -4466,11 +4464,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
/**
* Convenience method creating new HRegions. Used by createTable and by the
* bootstrap code in the HMaster constructor.
- * Note, this method creates an {@link HLog} for the created region. It
- * needs to be closed explicitly. Use {@link HRegion#getLog()} to get
+ * Note, this method creates an {@link WAL} for the created region. It
+ * needs to be closed explicitly. Use {@link HRegion#getWAL()} to get
* access. When done with a region created using this method, you will
- * need to explicitly close the {@link HLog} it created too; it will not be
- * done for you. Not closing the log will leave at least a daemon thread
+ * need to explicitly close the {@link WAL} it created too; it will not be
+ * done for you. Not closing the wal will leave at least a daemon thread
* running. Call {@link #closeHRegion(HRegion)} and it will do
* necessary cleanup for you.
* @param info Info for region to create.
@@ -4489,27 +4487,27 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* This will do the necessary cleanup a call to
* {@link #createHRegion(HRegionInfo, Path, Configuration, HTableDescriptor)}
* requires. This method will close the region and then close its
- * associated {@link HLog} file. You use it if you call the other createHRegion,
- * the one that takes an {@link HLog} instance but don't be surprised by the
- * call to the {@link HLog#closeAndDelete()} on the {@link HLog} the
+ * associated {@link WAL} file. You can still use it if you call the other createHRegion,
+ * the one that takes an {@link WAL} instance but don't be surprised by the
+ * call to the {@link WAL#close()} on the {@link WAL} the
* HRegion was carrying.
* @throws IOException
*/
public static void closeHRegion(final HRegion r) throws IOException {
if (r == null) return;
r.close();
- if (r.getLog() == null) return;
- r.getLog().closeAndDelete();
+ if (r.getWAL() == null) return;
+ r.getWAL().close();
}
/**
* Convenience method creating new HRegions. Used by createTable.
- * The {@link HLog} for the created region needs to be closed explicitly.
- * Use {@link HRegion#getLog()} to get access.
+ * The {@link WAL} for the created region needs to be closed explicitly.
+ * Use {@link HRegion#getWAL()} to get access.
*
* @param info Info for region to create.
* @param rootDir Root directory for HBase instance
- * @param hlog shared HLog
+ * @param wal shared WAL
* @param initialize - true to initialize the region
* @return new HRegion
*
@@ -4518,72 +4516,80 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
final Configuration conf,
final HTableDescriptor hTableDescriptor,
- final HLog hlog,
+ final WAL wal,
final boolean initialize)
throws IOException {
return createHRegion(info, rootDir, conf, hTableDescriptor,
- hlog, initialize, false);
+ wal, initialize, false);
}
/**
* Convenience method creating new HRegions. Used by createTable.
- * The {@link HLog} for the created region needs to be closed
+ * The {@link WAL} for the created region needs to be closed
* explicitly, if it is not null.
- * Use {@link HRegion#getLog()} to get access.
+ * Use {@link HRegion#getWAL()} to get access.
*
* @param info Info for region to create.
* @param rootDir Root directory for HBase instance
- * @param hlog shared HLog
+ * @param wal shared WAL
* @param initialize - true to initialize the region
- * @param ignoreHLog - true to skip generate new hlog if it is null, mostly for createTable
+ * @param ignoreWAL - true to skip generate new wal if it is null, mostly for createTable
* @return new HRegion
* @throws IOException
*/
public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
final Configuration conf,
final HTableDescriptor hTableDescriptor,
- final HLog hlog,
- final boolean initialize, final boolean ignoreHLog)
+ final WAL wal,
+ final boolean initialize, final boolean ignoreWAL)
throws IOException {
Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
- return createHRegion(info, rootDir, tableDir, conf, hTableDescriptor, hlog, initialize, ignoreHLog);
+ return createHRegion(info, rootDir, tableDir, conf, hTableDescriptor, wal, initialize,
+ ignoreWAL);
}
/**
* Convenience method creating new HRegions. Used by createTable.
- * The {@link HLog} for the created region needs to be closed
+ * The {@link WAL} for the created region needs to be closed
* explicitly, if it is not null.
- * Use {@link HRegion#getLog()} to get access.
+ * Use {@link HRegion#getWAL()} to get access.
*
* @param info Info for region to create.
* @param rootDir Root directory for HBase instance
* @param tableDir table directory
- * @param hlog shared HLog
+ * @param wal shared WAL
* @param initialize - true to initialize the region
- * @param ignoreHLog - true to skip generate new hlog if it is null, mostly for createTable
+ * @param ignoreWAL - true to skip generate new wal if it is null, mostly for createTable
* @return new HRegion
* @throws IOException
*/
public static HRegion createHRegion(final HRegionInfo info, final Path rootDir, final Path tableDir,
final Configuration conf,
final HTableDescriptor hTableDescriptor,
- final HLog hlog,
- final boolean initialize, final boolean ignoreHLog)
+ final WAL wal,
+ final boolean initialize, final boolean ignoreWAL)
throws IOException {
LOG.info("creating HRegion " + info.getTable().getNameAsString()
+ " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
" Table name == " + info.getTable().getNameAsString());
FileSystem fs = FileSystem.get(conf);
- HRegionFileSystem rfs = HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info);
- HLog effectiveHLog = hlog;
- if (hlog == null && !ignoreHLog) {
- effectiveHLog = HLogFactory.createHLog(fs, rfs.getRegionDir(),
- HConstants.HREGION_LOGDIR_NAME, conf);
+ HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info);
+ WAL effectiveWAL = wal;
+ if (wal == null && !ignoreWAL) {
+ // TODO HBASE-11983 There'll be no roller for this wal?
+ // The WAL subsystem will use the default rootDir rather than the passed in rootDir
+ // unless I pass along via the conf.
+ Configuration confForWAL = new Configuration(conf);
+ confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
+ effectiveWAL = (new WALFactory(confForWAL,
+ Collections.singletonList(new MetricsWAL()),
+ "hregion-" + RandomStringUtils.randomNumeric(8))).
+ getWAL(info.getEncodedNameAsBytes());
}
HRegion region = HRegion.newHRegion(tableDir,
- effectiveHLog, fs, conf, info, hTableDescriptor, null);
+ effectiveWAL, fs, conf, info, hTableDescriptor, null);
if (initialize) {
- // If initializing, set the sequenceId. It is also required by HLogPerformanceEvaluation when
+ // If initializing, set the sequenceId. It is also required by WALPerformanceEvaluation when
// verifying the WALEdits.
region.setSequenceId(region.initialize(null));
}
@@ -4593,25 +4599,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
final Configuration conf,
final HTableDescriptor hTableDescriptor,
- final HLog hlog)
+ final WAL wal)
throws IOException {
- return createHRegion(info, rootDir, conf, hTableDescriptor, hlog, true);
+ return createHRegion(info, rootDir, conf, hTableDescriptor, wal, true);
}
/**
* Open a Region.
* @param info Info for region to be opened.
- * @param wal HLog for region to use. This method will call
- * HLog#setSequenceNumber(long) passing the result of the call to
- * HRegion#getMinSequenceId() to ensure the log id is properly kept
+ * @param wal WAL for region to use. This method will call
+ * WAL#setSequenceNumber(long) passing the result of the call to
+ * HRegion#getMinSequenceId() to ensure the wal id is properly kept
* up. HRegionStore does this every time it opens a new region.
* @return new HRegion
*
* @throws IOException
*/
public static HRegion openHRegion(final HRegionInfo info,
- final HTableDescriptor htd, final HLog wal,
+ final HTableDescriptor htd, final WAL wal,
final Configuration conf)
throws IOException {
return openHRegion(info, htd, wal, conf, null, null);
@@ -4621,9 +4627,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* Open a Region.
* @param info Info for region to be opened
* @param htd the table descriptor
- * @param wal HLog for region to use. This method will call
- * HLog#setSequenceNumber(long) passing the result of the call to
- * HRegion#getMinSequenceId() to ensure the log id is properly kept
+ * @param wal WAL for region to use. This method will call
+ * WAL#setSequenceNumber(long) passing the result of the call to
+ * HRegion#getMinSequenceId() to ensure the wal id is properly kept
* up. HRegionStore does this every time it opens a new region.
* @param conf The Configuration object to use.
* @param rsServices An interface we can request flushes against.
@@ -4633,7 +4639,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @throws IOException
*/
public static HRegion openHRegion(final HRegionInfo info,
- final HTableDescriptor htd, final HLog wal, final Configuration conf,
+ final HTableDescriptor htd, final WAL wal, final Configuration conf,
final RegionServerServices rsServices,
final CancelableProgressable reporter)
throws IOException {
@@ -4645,16 +4651,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @param rootDir Root directory for HBase instance
* @param info Info for region to be opened.
* @param htd the table descriptor
- * @param wal HLog for region to use. This method will call
- * HLog#setSequenceNumber(long) passing the result of the call to
- * HRegion#getMinSequenceId() to ensure the log id is properly kept
+ * @param wal WAL for region to use. This method will call
+ * WAL#setSequenceNumber(long) passing the result of the call to
+ * HRegion#getMinSequenceId() to ensure the wal id is properly kept
* up. HRegionStore does this every time it opens a new region.
* @param conf The Configuration object to use.
* @return new HRegion
* @throws IOException
*/
public static HRegion openHRegion(Path rootDir, final HRegionInfo info,
- final HTableDescriptor htd, final HLog wal, final Configuration conf)
+ final HTableDescriptor htd, final WAL wal, final Configuration conf)
throws IOException {
return openHRegion(rootDir, info, htd, wal, conf, null, null);
}
@@ -4664,9 +4670,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @param rootDir Root directory for HBase instance
* @param info Info for region to be opened.
* @param htd the table descriptor
- * @param wal HLog for region to use. This method will call
- * HLog#setSequenceNumber(long) passing the result of the call to
- * HRegion#getMinSequenceId() to ensure the log id is properly kept
+ * @param wal WAL for region to use. This method will call
+ * WAL#setSequenceNumber(long) passing the result of the call to
+ * HRegion#getMinSequenceId() to ensure the wal id is properly kept
* up. HRegionStore does this every time it opens a new region.
* @param conf The Configuration object to use.
* @param rsServices An interface we can request flushes against.
@@ -4675,7 +4681,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @throws IOException
*/
public static HRegion openHRegion(final Path rootDir, final HRegionInfo info,
- final HTableDescriptor htd, final HLog wal, final Configuration conf,
+ final HTableDescriptor htd, final WAL wal, final Configuration conf,
final RegionServerServices rsServices,
final CancelableProgressable reporter)
throws IOException {
@@ -4696,15 +4702,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @param rootDir Root directory for HBase instance
* @param info Info for region to be opened.
* @param htd the table descriptor
- * @param wal HLog for region to use. This method will call
- * HLog#setSequenceNumber(long) passing the result of the call to
- * HRegion#getMinSequenceId() to ensure the log id is properly kept
+ * @param wal WAL for region to use. This method will call
+ * WAL#setSequenceNumber(long) passing the result of the call to
+ * HRegion#getMinSequenceId() to ensure the wal id is properly kept
* up. HRegionStore does this every time it opens a new region.
* @return new HRegion
* @throws IOException
*/
public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
- final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal)
+ final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final WAL wal)
throws IOException {
return openHRegion(conf, fs, rootDir, info, htd, wal, null, null);
}
@@ -4716,9 +4722,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @param rootDir Root directory for HBase instance
* @param info Info for region to be opened.
* @param htd the table descriptor
- * @param wal HLog for region to use. This method will call
- * HLog#setSequenceNumber(long) passing the result of the call to
- * HRegion#getMinSequenceId() to ensure the log id is properly kept
+ * @param wal WAL for region to use. This method will call
+ * WAL#setSequenceNumber(long) passing the result of the call to
+ * HRegion#getMinSequenceId() to ensure the wal id is properly kept
* up. HRegionStore does this every time it opens a new region.
* @param rsServices An interface we can request flushes against.
* @param reporter An interface we can report progress against.
@@ -4726,7 +4732,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @throws IOException
*/
public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
- final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal,
+ final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final WAL wal,
final RegionServerServices rsServices, final CancelableProgressable reporter)
throws IOException {
Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
@@ -4740,9 +4746,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @param rootDir Root directory for HBase instance
* @param info Info for region to be opened.
* @param htd the table descriptor
- * @param wal HLog for region to use. This method will call
- * HLog#setSequenceNumber(long) passing the result of the call to
- * HRegion#getMinSequenceId() to ensure the log id is properly kept
+ * @param wal WAL for region to use. This method will call
+ * WAL#setSequenceNumber(long) passing the result of the call to
+ * HRegion#getMinSequenceId() to ensure the wal id is properly kept
* up. HRegionStore does this every time it opens a new region.
* @param rsServices An interface we can request flushes against.
* @param reporter An interface we can report progress against.
@@ -4750,8 +4756,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @throws IOException
*/
public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
- final Path rootDir, final Path tableDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal,
- final RegionServerServices rsServices, final CancelableProgressable reporter)
+ final Path rootDir, final Path tableDir, final HRegionInfo info, final HTableDescriptor htd,
+ final WAL wal, final RegionServerServices rsServices,
+ final CancelableProgressable reporter)
throws IOException {
if (info == null) throw new NullPointerException("Passed region info is null");
if (LOG.isDebugEnabled()) {
@@ -4772,7 +4779,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
public static HRegion openHRegion(final HRegion other, final CancelableProgressable reporter)
throws IOException {
HRegionFileSystem regionFs = other.getRegionFileSystem();
- HRegion r = newHRegion(regionFs.getTableDir(), other.getLog(), regionFs.getFileSystem(),
+ HRegion r = newHRegion(regionFs.getTableDir(), other.getWAL(), regionFs.getFileSystem(),
other.baseConf, other.getRegionInfo(), other.getTableDesc(), null);
return r.openHRegion(reporter);
}
@@ -4789,8 +4796,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
this.openSeqNum = initialize(reporter);
this.setSequenceId(openSeqNum);
- if (log != null && getRegionServerServices() != null) {
- writeRegionOpenMarker(log, openSeqNum);
+ if (wal != null && getRegionServerServices() != null) {
+ writeRegionOpenMarker(wal, openSeqNum);
}
return this;
}
@@ -4812,7 +4819,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
fs.commitDaughterRegion(hri);
// Create the daughter HRegion instance
- HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getLog(), fs.getFileSystem(),
+ HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getWAL(), fs.getFileSystem(),
this.getBaseConf(), hri, this.getTableDesc(), rsServices);
r.readRequestsCount.set(this.getReadRequestsCount() / 2);
r.writeRequestsCount.set(this.getWriteRequestsCount() / 2);
@@ -4827,7 +4834,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
*/
HRegion createMergedRegionFromMerges(final HRegionInfo mergedRegionInfo,
final HRegion region_b) throws IOException {
- HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getLog(),
+ HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getWAL(),
fs.getFileSystem(), this.getBaseConf(), mergedRegionInfo,
this.getTableDesc(), this.rsServices);
r.readRequestsCount.set(this.getReadRequestsCount()
@@ -5174,7 +5181,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
List memstoreCells = new ArrayList();
Collection rowsToLock = processor.getRowsToLock();
long mvccNum = 0;
- HLogKey walKey = null;
+ WALKey walKey = null;
try {
// 2. Acquire the row lock(s)
acquiredRowLocks = new ArrayList(rowsToLock.size());
@@ -5219,16 +5226,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
long txid = 0;
// 8. Append no sync
if (!walEdit.isEmpty()) {
+ // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
- this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now,
+ this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
processor.getClusterIds(), nonceGroup, nonce);
- txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(),
+ txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
walKey, walEdit, getSequenceId(), true, memstoreCells);
}
if(walKey == null){
- // since we use log sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit
+ // since we use wal sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit
// to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId
- walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
+ walKey = this.appendEmptyEdit(this.wal, memstoreCells);
}
// 9. Release region lock
@@ -5368,7 +5376,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
this.writeRequestsCount.increment();
long mvccNum = 0;
WriteEntry w = null;
- HLogKey walKey = null;
+ WALKey walKey = null;
RowLock rowLock = null;
List memstoreCells = new ArrayList();
boolean doRollBackMemstore = false;
@@ -5473,7 +5481,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
}
- //store the kvs to the temporary memstore before writing HLog
+ //store the kvs to the temporary memstore before writing WAL
tempMemstore.put(store, kvs);
}
@@ -5501,16 +5509,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// Using default cluster id, as this can only happen in the originating
// cluster. A slave cluster receives the final value (not the delta)
// as a Put.
+ // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
walKey = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
- this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, nonceGroup, nonce);
- txid = this.log.appendNoSync(this.htableDescriptor, getRegionInfo(), walKey, walEdits,
+ this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce);
+ txid = this.wal.append(this.htableDescriptor, getRegionInfo(), walKey, walEdits,
this.sequenceId, true, memstoreCells);
} else {
recordMutationWithoutWal(append.getFamilyCellMap());
}
if (walKey == null) {
// Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
- walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
+ walKey = this.appendEmptyEdit(this.wal, memstoreCells);
}
size = this.addAndGetGlobalMemstoreSize(size);
@@ -5585,7 +5594,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
this.writeRequestsCount.increment();
RowLock rowLock = null;
WriteEntry w = null;
- HLogKey walKey = null;
+ WALKey walKey = null;
long mvccNum = 0;
List memstoreCells = new ArrayList();
boolean doRollBackMemstore = false;
@@ -5692,7 +5701,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
}
- //store the kvs to the temporary memstore before writing HLog
+ //store the kvs to the temporary memstore before writing WAL
if (!kvs.isEmpty()) {
tempMemstore.put(store, kvs);
}
@@ -5726,9 +5735,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// Using default cluster id, as this can only happen in the originating
// cluster. A slave cluster receives the final value (not the delta)
// as a Put.
+ // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
- this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, nonceGroup, nonce);
- txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(),
+ this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce);
+ txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
walKey, walEdits, getSequenceId(), true, memstoreCells);
} else {
recordMutationWithoutWal(increment.getFamilyCellMap());
@@ -5736,7 +5746,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
if(walKey == null){
// Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned
- walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
+ walKey = this.appendEmptyEdit(this.wal, memstoreCells);
}
} finally {
this.updatesLock.readLock().unlock();
@@ -5942,13 +5952,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @throws IOException
*/
private static void processTable(final FileSystem fs, final Path p,
- final HLog log, final Configuration c,
+ final WALFactory walFactory, final Configuration c,
final boolean majorCompact)
throws IOException {
HRegion region;
// Currently expects tables have one region only.
if (FSUtils.getTableName(p).equals(TableName.META_TABLE_NAME)) {
- region = HRegion.newHRegion(p, log, fs, c,
+ final WAL wal = walFactory.getMetaWAL(
+ HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes());
+ region = HRegion.newHRegion(p, wal, fs, c,
HRegionInfo.FIRST_META_REGIONINFO, HTableDescriptor.META_TABLEDESC, null);
} else {
throw new IOException("Not a known catalog table: " + p.toString());
@@ -6248,13 +6260,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
*/
private void syncOrDefer(long txid, Durability durability) throws IOException {
if (this.getRegionInfo().isMetaRegion()) {
- this.log.sync(txid);
+ this.wal.sync(txid);
} else {
switch(durability) {
case USE_DEFAULT:
// do what table defaults to
- if (shouldSyncLog()) {
- this.log.sync(txid);
+ if (shouldSyncWAL()) {
+ this.wal.sync(txid);
}
break;
case SKIP_WAL:
@@ -6266,16 +6278,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
case SYNC_WAL:
case FSYNC_WAL:
// sync the WAL edit (SYNC and FSYNC treated the same for now)
- this.log.sync(txid);
+ this.wal.sync(txid);
break;
}
}
}
/**
- * Check whether we should sync the log from the table's durability settings
+ * Check whether we should sync the wal from the table's durability settings
*/
- private boolean shouldSyncLog() {
+ private boolean shouldSyncWAL() {
return durability.ordinal() > Durability.ASYNC_WAL.ordinal();
}
@@ -6329,13 +6341,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
final Configuration c = HBaseConfiguration.create();
final FileSystem fs = FileSystem.get(c);
final Path logdir = new Path(c.get("hbase.tmp.dir"));
- final String logname = "hlog" + FSUtils.getTableName(tableDir) + System.currentTimeMillis();
+ final String logname = "wal" + FSUtils.getTableName(tableDir) + System.currentTimeMillis();
- final HLog log = HLogFactory.createHLog(fs, logdir, logname, c);
+ final Configuration walConf = new Configuration(c);
+ FSUtils.setRootDir(walConf, logdir);
+ final WALFactory wals = new WALFactory(walConf, null, logname);
try {
- processTable(fs, tableDir, log, c, majorCompact);
+ processTable(fs, tableDir, wals, c, majorCompact);
} finally {
- log.close();
+ wals.close();
// TODO: is this still right?
BlockCache bc = new CacheConfig(c).getBlockCache();
if (bc != null) bc.shutdown();
@@ -6496,20 +6510,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
/**
- * Append a faked WALEdit in order to get a long sequence number and log syncer will just ignore
+ * Append a faked WALEdit in order to get a long sequence number and wal syncer will just ignore
* the WALEdit append later.
* @param wal
* @param cells list of Cells inserted into memstore. Those Cells are passed in order to
- * be updated with right mvcc values(their log sequence number)
+ * be updated with right mvcc values(their wal sequence number)
* @return Return the key used appending with no sync and no append.
* @throws IOException
*/
- private HLogKey appendNoSyncNoAppend(final HLog wal, List cells) throws IOException {
- HLogKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(),
- HLog.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
+ private WALKey appendEmptyEdit(final WAL wal, List cells) throws IOException {
+ // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
+ WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(),
+ WALKey.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
// Call append but with an empty WALEdit. The returned seqeunce id will not be associated
// with any edit and we can be sure it went in after all outstanding appends.
- wal.appendNoSync(getTableDesc(), getRegionInfo(), key,
+ wal.append(getTableDesc(), getRegionInfo(), key,
WALEdit.EMPTY_WALEDIT, this.sequenceId, false, cells);
return key;
}
@@ -6519,8 +6534,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @throws IOException
*/
public void syncWal() throws IOException {
- if(this.log != null) {
- this.log.sync();
+ if(this.wal != null) {
+ this.wal.sync();
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index db66f5c..c3e8650 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -45,6 +45,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.management.ObjectName;
@@ -124,9 +125,10 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Repor
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.trace.SpanReceiverHost;
@@ -324,15 +326,13 @@ public class HRegionServer extends HasThread implements
*/
Chore periodicFlusher;
- // HLog and HLog roller. log is protected rather than private to avoid
- // eclipse warning when accessed by inner classes
- protected volatile HLog hlog;
- // The meta updates are written to a different hlog. If this
- // regionserver holds meta regions, then this field will be non-null.
- protected volatile HLog hlogForMeta;
+ protected volatile WALFactory walFactory;
- LogRoller hlogRoller;
- LogRoller metaHLogRoller;
+ // WAL roller. log is protected rather than private to avoid
+ // eclipse warning when accessed by inner classes
+ final LogRoller walRoller;
+ // Lazily initialized if this RegionServer hosts a meta table.
+ final AtomicReference metawalRoller = new AtomicReference();
// flag set after we're done setting up server threads
final AtomicBoolean online = new AtomicBoolean(false);
@@ -543,6 +543,7 @@ public class HRegionServer extends HasThread implements
rpcServices.start();
putUpWebUI();
+ this.walRoller = new LogRoller(this, this);
}
protected void login(UserProvider user, String host) throws IOException {
@@ -971,7 +972,7 @@ public class HRegionServer extends HasThread implements
//fsOk flag may be changed when closing regions throws exception.
if (this.fsOk) {
- closeWAL(!abortRequested);
+ shutdownWAL(!abortRequested);
}
// Make sure the proxy is down.
@@ -1073,7 +1074,8 @@ public class HRegionServer extends HasThread implements
}
}
- ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime) {
+ ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime)
+ throws IOException {
// We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
// per second, and other metrics As long as metrics are part of ServerLoad it's best to use
// the wrapper to compute those numbers in one place.
@@ -1092,7 +1094,7 @@ public class HRegionServer extends HasThread implements
serverLoad.setTotalNumberOfRequests((int) regionServerWrapper.getTotalRequestCount());
serverLoad.setUsedHeapMB((int)(memory.getUsed() / 1024 / 1024));
serverLoad.setMaxHeapMB((int) (memory.getMax() / 1024 / 1024));
- Set coprocessors = this.hlog.getCoprocessorHost().getCoprocessors();
+ Set coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors();
for (String coprocessor : coprocessors) {
serverLoad.addCoprocessors(
Coprocessor.newBuilder().setName(coprocessor).build());
@@ -1101,6 +1103,10 @@ public class HRegionServer extends HasThread implements
RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
for (HRegion region : regions) {
serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
+ for (String coprocessor :
+ getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors()) {
+ serverLoad.addCoprocessors(Coprocessor.newBuilder().setName(coprocessor).build());
+ }
}
serverLoad.setReportStartTime(reportStartTime);
serverLoad.setReportEndTime(reportEndTime);
@@ -1189,33 +1195,24 @@ public class HRegionServer extends HasThread implements
return interrupted;
}
- private void closeWAL(final boolean delete) {
- if (this.hlogForMeta != null) {
- // All hlogs (meta and non-meta) are in the same directory. Don't call
- // closeAndDelete here since that would delete all hlogs not just the
- // meta ones. We will just 'close' the hlog for meta here, and leave
- // the directory cleanup to the follow-on closeAndDelete call.
- try {
- this.hlogForMeta.close();
- } catch (Throwable e) {
- LOG.error("Metalog close and delete failed", RemoteExceptionHandler.checkThrowable(e));
- }
- }
- if (this.hlog != null) {
+ private void shutdownWAL(final boolean close) {
+ if (this.walFactory != null) {
try {
- if (delete) {
- hlog.closeAndDelete();
+ if (close) {
+ walFactory.close();
} else {
- hlog.close();
+ walFactory.shutdown();
}
} catch (Throwable e) {
- LOG.error("Close and delete failed", RemoteExceptionHandler.checkThrowable(e));
+ e = RemoteExceptionHandler.checkThrowable(e);
+ LOG.error("Shutdown / close of WAL failed: " + e);
+ LOG.debug("Shutdown / close exception details:", e);
}
}
}
/*
- * Run init. Sets up hlog and starts up all server threads.
+ * Run init. Sets up wal and starts up all server threads.
*
* @param c Extra configuration.
*/
@@ -1253,7 +1250,7 @@ public class HRegionServer extends HasThread implements
ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
this.cacheConfig = new CacheConfig(conf);
- this.hlog = setupWALAndReplication();
+ this.walFactory = setupWALAndReplication();
// Init in here rather than in constructor after thread name has been set
this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this));
@@ -1497,10 +1494,10 @@ public class HRegionServer extends HasThread implements
* @return A WAL instance.
* @throws IOException
*/
- private HLog setupWALAndReplication() throws IOException {
+ private WALFactory setupWALAndReplication() throws IOException {
+ // TODO Replication make assumptions here based on the default filesystem impl
final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
- final String logName
- = HLogUtil.getHLogDirectoryName(this.serverName.toString());
+ final String logName = DefaultWALProvider.getWALDirectoryName(this.serverName.toString());
Path logdir = new Path(rootDir, logName);
if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
@@ -1513,66 +1510,44 @@ public class HRegionServer extends HasThread implements
// log directories.
createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir);
- return instantiateHLog(rootDir, logName);
- }
-
- private HLog getMetaWAL() throws IOException {
- if (this.hlogForMeta != null) return this.hlogForMeta;
- final String logName = HLogUtil.getHLogDirectoryName(this.serverName.toString());
- Path logdir = new Path(rootDir, logName);
- if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
- this.hlogForMeta = HLogFactory.createMetaHLog(this.fs.getBackingFs(), rootDir, logName,
- this.conf, getMetaWALActionListeners(), this.serverName.toString());
- return this.hlogForMeta;
- }
-
- /**
- * Called by {@link #setupWALAndReplication()} creating WAL instance.
- * @param rootdir
- * @param logName
- * @return WAL instance.
- * @throws IOException
- */
- protected HLog instantiateHLog(Path rootdir, String logName) throws IOException {
- return HLogFactory.createHLog(this.fs.getBackingFs(), rootdir, logName, this.conf,
- getWALActionListeners(), this.serverName.toString());
- }
-
- /**
- * Called by {@link #instantiateHLog(Path, String)} setting up WAL instance.
- * Add any {@link WALActionsListener}s you want inserted before WAL startup.
- * @return List of WALActionsListener that will be passed in to
- * {@link org.apache.hadoop.hbase.regionserver.wal.FSHLog} on construction.
- */
- protected List getWALActionListeners() {
- List listeners = new ArrayList();
- // Log roller.
- this.hlogRoller = new LogRoller(this, this);
- listeners.add(this.hlogRoller);
+ // listeners the wal factory will add to wals it creates.
+ final List listeners = new ArrayList();
+ listeners.add(new MetricsWAL());
if (this.replicationSourceHandler != null &&
this.replicationSourceHandler.getWALActionsListener() != null) {
// Replication handler is an implementation of WALActionsListener.
listeners.add(this.replicationSourceHandler.getWALActionsListener());
}
- return listeners;
+
+ return new WALFactory(conf, listeners, serverName.toString());
}
- protected List getMetaWALActionListeners() {
- List listeners = new ArrayList();
+ /**
+ * We initialize the roller for the wal that handles meta lazily
+ * since we don't know if this regionserver will handle it. All calls to
+ * this method return a reference to the that same roller. As newly referenced
+ * meta regions are brought online, they will be offered to the roller for maintenance.
+ * As a part of that registration process, the roller will add itself as a
+ * listener on the wal.
+ */
+ protected LogRoller ensureMetaWALRoller() {
// Using a tmp log roller to ensure metaLogRoller is alive once it is not
// null
- MetaLogRoller tmpLogRoller = new MetaLogRoller(this, this);
- String n = Thread.currentThread().getName();
- Threads.setDaemonThreadRunning(tmpLogRoller.getThread(),
- n + "-MetaLogRoller", uncaughtExceptionHandler);
- this.metaHLogRoller = tmpLogRoller;
- tmpLogRoller = null;
- listeners.add(this.metaHLogRoller);
- return listeners;
- }
-
- protected LogRoller getLogRoller() {
- return hlogRoller;
+ LogRoller roller = metawalRoller.get();
+ if (null == roller) {
+ LogRoller tmpLogRoller = new LogRoller(this, this);
+ String n = Thread.currentThread().getName();
+ Threads.setDaemonThreadRunning(tmpLogRoller.getThread(),
+ n + "-MetaLogRoller", uncaughtExceptionHandler);
+ if (metawalRoller.compareAndSet(null, tmpLogRoller)) {
+ roller = tmpLogRoller;
+ } else {
+ // Another thread won starting the roller
+ Threads.shutdown(tmpLogRoller.getThread());
+ roller = metawalRoller.get();
+ }
+ }
+ return roller;
}
public MetricsRegionServer getRegionServerMetrics() {
@@ -1615,7 +1590,7 @@ public class HRegionServer extends HasThread implements
this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
"hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS));
- Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), getName() + ".logRoller",
+ Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller",
uncaughtExceptionHandler);
this.cacheFlusher.start(uncaughtExceptionHandler);
Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), getName() +
@@ -1662,7 +1637,7 @@ public class HRegionServer extends HasThread implements
sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1);
- this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this);
+ this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this, walFactory);
splitLogWorker.start();
}
@@ -1725,38 +1700,37 @@ public class HRegionServer extends HasThread implements
}
// Verify that all threads are alive
if (!(leases.isAlive()
- && cacheFlusher.isAlive() && hlogRoller.isAlive()
+ && cacheFlusher.isAlive() && walRoller.isAlive()
&& this.compactionChecker.isAlive()
&& this.periodicFlusher.isAlive())) {
stop("One or more threads are no longer alive -- stop");
return false;
}
- if (metaHLogRoller != null && !metaHLogRoller.isAlive()) {
- stop("Meta HLog roller thread is no longer alive -- stop");
+ final LogRoller metawalRoller = this.metawalRoller.get();
+ if (metawalRoller != null && !metawalRoller.isAlive()) {
+ stop("Meta WAL roller thread is no longer alive -- stop");
return false;
}
return true;
}
- public HLog getWAL() {
- try {
- return getWAL(null);
- } catch (IOException e) {
- LOG.warn("getWAL threw exception " + e);
- return null;
- }
- }
+ private static final byte[] UNSPECIFIED_REGION = new byte[]{};
@Override
- public HLog getWAL(HRegionInfo regionInfo) throws IOException {
- //TODO: at some point this should delegate to the HLogFactory
- //currently, we don't care about the region as much as we care about the
- //table.. (hence checking the tablename below)
+ public WAL getWAL(HRegionInfo regionInfo) throws IOException {
+ WAL wal;
+ LogRoller roller = walRoller;
//_ROOT_ and hbase:meta regions have separate WAL.
if (regionInfo != null && regionInfo.isMetaTable()) {
- return getMetaWAL();
+ roller = ensureMetaWALRoller();
+ wal = walFactory.getMetaWAL(regionInfo.getEncodedNameAsBytes());
+ } else if (regionInfo == null) {
+ wal = walFactory.getWAL(UNSPECIFIED_REGION);
+ } else {
+ wal = walFactory.getWAL(regionInfo.getEncodedNameAsBytes());
}
- return this.hlog;
+ roller.addWAL(wal);
+ return wal;
}
@Override
@@ -1982,11 +1956,12 @@ public class HRegionServer extends HasThread implements
if (this.spanReceiverHost != null) {
this.spanReceiverHost.closeReceivers();
}
- if (this.hlogRoller != null) {
- Threads.shutdown(this.hlogRoller.getThread());
+ if (this.walRoller != null) {
+ Threads.shutdown(this.walRoller.getThread());
}
- if (this.metaHLogRoller != null) {
- Threads.shutdown(this.metaHLogRoller.getThread());
+ final LogRoller metawalRoller = this.metawalRoller.get();
+ if (metawalRoller != null) {
+ Threads.shutdown(metawalRoller.getThread());
}
if (this.compactSplitThread != null) {
this.compactSplitThread.join();
@@ -2524,11 +2499,24 @@ public class HRegionServer extends HasThread implements
// used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070).
public String[] getRegionServerCoprocessors() {
- TreeSet coprocessors = new TreeSet(
- this.hlog.getCoprocessorHost().getCoprocessors());
+ TreeSet coprocessors = new TreeSet();
+ try {
+ coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
+ } catch (IOException exception) {
+ LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; " +
+ "skipping.");
+ LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
+ }
Collection regions = getOnlineRegionsLocalContext();
for (HRegion region: regions) {
coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
+ try {
+ coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors());
+ } catch (IOException exception) {
+ LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region +
+ "; skipping.");
+ LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
+ }
}
return coprocessors.toArray(new String[coprocessors.size()]);
}
@@ -2665,16 +2653,22 @@ public class HRegionServer extends HasThread implements
HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
if (destination != null) {
- HLog wal = getWAL();
- long closeSeqNum = wal.getEarliestMemstoreSeqNum(r.getRegionInfo().getEncodedNameAsBytes());
- if (closeSeqNum == HConstants.NO_SEQNUM) {
- // No edits in WAL for this region; get the sequence number when the region was opened.
- closeSeqNum = r.getOpenSeqNum();
+ try {
+ WAL wal = getWAL(r.getRegionInfo());
+ long closeSeqNum = wal.getEarliestMemstoreSeqNum(r.getRegionInfo().getEncodedNameAsBytes());
if (closeSeqNum == HConstants.NO_SEQNUM) {
- closeSeqNum = 0;
+ // No edits in WAL for this region; get the sequence number when the region was opened.
+ closeSeqNum = r.getOpenSeqNum();
+ if (closeSeqNum == HConstants.NO_SEQNUM) {
+ closeSeqNum = 0;
+ }
}
+ addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum);
+ } catch (IOException exception) {
+ LOG.error("Could not retrieve WAL information for region " + r.getRegionInfo() +
+ "; not adding to moved regions.");
+ LOG.debug("Exception details for failure to get wal", exception);
}
- addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum);
}
this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
return toReturn != null;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index eb51c50..2adaaba 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -79,7 +79,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.security.EncryptionUtil;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
@@ -1216,7 +1216,7 @@ public class HStore implements Store {
*/
private void writeCompactionWalRecord(Collection filesCompacted,
Collection newFiles) throws IOException {
- if (region.getLog() == null) return;
+ if (region.getWAL() == null) return;
List inputPaths = new ArrayList(filesCompacted.size());
for (StoreFile f : filesCompacted) {
inputPaths.add(f.getPath());
@@ -1228,7 +1228,7 @@ public class HStore implements Store {
HRegionInfo info = this.region.getRegionInfo();
CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info,
family.getName(), inputPaths, outputPaths, fs.getStoreDir(getFamily().getNameAsString()));
- HLogUtil.writeCompactionMarker(region.getLog(), this.region.getTableDesc(),
+ WALUtil.writeCompactionMarker(region.getWAL(), this.region.getTableDesc(),
this.region.getRegionInfo(), compactionDescriptor, this.region.getSequenceId());
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
index 8179c98..821756d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
@@ -18,15 +18,20 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import java.io.IOException;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.HasThread;
@@ -36,17 +41,21 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
/**
- * Runs periodically to determine if the HLog should be rolled.
+ * Runs periodically to determine if the WAL should be rolled.
*
* NOTE: This class extends Thread rather than Chore because the sleep time
* can be interrupted when there is something to do, rather than the Chore
* sleep time which is invariant.
+ *
+ * TODO: change to a pool of threads
*/
@InterfaceAudience.Private
-class LogRoller extends HasThread implements WALActionsListener {
+class LogRoller extends HasThread {
static final Log LOG = LogFactory.getLog(LogRoller.class);
private final ReentrantLock rollLock = new ReentrantLock();
private final AtomicBoolean rollLog = new AtomicBoolean(false);
+ private final ConcurrentHashMap walNeedsRoll =
+ new ConcurrentHashMap();
private final Server server;
protected final RegionServerServices services;
private volatile long lastrolltime = System.currentTimeMillis();
@@ -54,6 +63,32 @@ class LogRoller extends HasThread implements WALActionsListener {
private final long rollperiod;
private final int threadWakeFrequency;
+ public void addWAL(final WAL wal) {
+ if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) {
+ wal.registerWALActionsListener(new WALActionsListener.Base() {
+ @Override
+ public void logRollRequested() {
+ walNeedsRoll.put(wal, Boolean.TRUE);
+ // TODO logs will contend with each other here, replace with e.g. DelayedQueue
+ synchronized(rollLog) {
+ rollLog.set(true);
+ rollLog.notifyAll();
+ }
+ }
+ });
+ }
+ }
+
+ public void requestRollAll() {
+ for (WAL wal : walNeedsRoll.keySet()) {
+ walNeedsRoll.put(wal, Boolean.TRUE);
+ }
+ synchronized(rollLog) {
+ rollLog.set(true);
+ rollLog.notifyAll();
+ }
+ }
+
/** @param server */
public LogRoller(final Server server, final RegionServerServices services) {
super();
@@ -84,19 +119,24 @@ class LogRoller extends HasThread implements WALActionsListener {
}
// Time for periodic roll
if (LOG.isDebugEnabled()) {
- LOG.debug("Hlog roll period " + this.rollperiod + "ms elapsed");
+ LOG.debug("Wal roll period " + this.rollperiod + "ms elapsed");
}
} else if (LOG.isDebugEnabled()) {
- LOG.debug("HLog roll requested");
+ LOG.debug("WAL roll requested");
}
rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH
try {
this.lastrolltime = now;
- // Force the roll if the logroll.period is elapsed or if a roll was requested.
- // The returned value is an array of actual region names.
- byte [][] regionsToFlush = getWAL().rollWriter(periodic || rollLog.get());
- if (regionsToFlush != null) {
- for (byte [] r: regionsToFlush) scheduleFlush(r);
+ for (Entry entry : walNeedsRoll.entrySet()) {
+ final WAL wal = entry.getKey();
+ // Force the roll if the logroll.period is elapsed or if a roll was requested.
+ // The returned value is an array of actual region names.
+ final byte [][] regionsToFlush = wal.rollWriter(periodic ||
+ entry.getValue().booleanValue());
+ walNeedsRoll.put(wal, Boolean.FALSE);
+ if (regionsToFlush != null) {
+ for (byte [] r: regionsToFlush) scheduleFlush(r);
+ }
}
} catch (FailedLogCloseException e) {
server.abort("Failed log close in log roller", e);
@@ -141,51 +181,4 @@ class LogRoller extends HasThread implements WALActionsListener {
}
}
- public void logRollRequested() {
- synchronized (rollLog) {
- rollLog.set(true);
- rollLog.notifyAll();
- }
- }
-
- protected HLog getWAL() throws IOException {
- return this.services.getWAL(null);
- }
-
- @Override
- public void preLogRoll(Path oldPath, Path newPath) throws IOException {
- // Not interested
- }
-
- @Override
- public void postLogRoll(Path oldPath, Path newPath) throws IOException {
- // Not interested
- }
-
- @Override
- public void preLogArchive(Path oldPath, Path newPath) throws IOException {
- // Not interested
- }
-
- @Override
- public void postLogArchive(Path oldPath, Path newPath) throws IOException {
- // Not interested
- }
-
- @Override
- public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
- WALEdit logEdit) {
- // Not interested.
- }
-
- @Override
- public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey,
- WALEdit logEdit) {
- //Not interested
- }
-
- @Override
- public void logCloseRequested() {
- // not interested
- }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
index 72242c6..b2820dd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
@@ -458,11 +458,11 @@ class MemStoreFlusher implements FlushRequester {
}
} catch (DroppedSnapshotException ex) {
// Cache flush can fail in a few places. If it fails in a critical
- // section, we get a DroppedSnapshotException and a replay of hlog
+ // section, we get a DroppedSnapshotException and a replay of wal
// is required. Currently the only way to do this is a restart of
// the server. Abort because hdfs is probably bad (HBASE-644 is a case
// where hdfs was bad but passed the hdfs check).
- server.abort("Replay of HLog required. Forcing server shutdown", ex);
+ server.abort("Replay of WAL required. Forcing server shutdown", ex);
return false;
} catch (IOException ex) {
LOG.error("Cache flush failed" +
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetaLogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetaLogRoller.java
deleted file mode 100644
index 467cfdf..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetaLogRoller.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-
-@InterfaceAudience.Private
-class MetaLogRoller extends LogRoller {
- public MetaLogRoller(Server server, RegionServerServices services) {
- super(server, services);
- }
- @Override
- protected HLog getWAL() throws IOException {
- //The argument to getWAL below could either be HRegionInfo.FIRST_META_REGIONINFO or
- //HRegionInfo.ROOT_REGIONINFO. Both these share the same WAL.
- return services.getWAL(HRegionInfo.FIRST_META_REGIONINFO);
- }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
index a606e8c..415e271 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.CacheStats;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.metrics2.MetricsExecutor;
@@ -50,8 +51,8 @@ class MetricsRegionServerWrapperImpl
private BlockCache blockCache;
private volatile long numStores = 0;
- private volatile long numHLogFiles = 0;
- private volatile long hlogFileSize = 0;
+ private volatile long numWALFiles = 0;
+ private volatile long walFileSize = 0;
private volatile long numStoreFiles = 0;
private volatile long memstoreSize = 0;
private volatile long storeFileSize = 0;
@@ -274,13 +275,13 @@ class MetricsRegionServerWrapperImpl
}
@Override
- public long getNumHLogFiles() {
- return numHLogFiles;
+ public long getNumWALFiles() {
+ return numWALFiles;
}
@Override
- public long getHLogFileSize() {
- return hlogFileSize;
+ public long getWALFileSize() {
+ return walFileSize;
}
@Override
@@ -480,21 +481,11 @@ class MetricsRegionServerWrapperImpl
}
lastRan = currentTime;
+ numWALFiles = DefaultWALProvider.getNumLogFiles(regionServer.walFactory);
+ walFileSize = DefaultWALProvider.getLogFileSize(regionServer.walFactory);
+
//Copy over computed values so that no thread sees half computed values.
numStores = tempNumStores;
- long tempNumHLogFiles = regionServer.hlog.getNumLogFiles();
- // meta logs
- if (regionServer.hlogForMeta != null) {
- tempNumHLogFiles += regionServer.hlogForMeta.getNumLogFiles();
- }
- numHLogFiles = tempNumHLogFiles;
-
- long tempHlogFileSize = regionServer.hlog.getLogFileSize();
- if (regionServer.hlogForMeta != null) {
- tempHlogFileSize += regionServer.hlogForMeta.getLogFileSize();
- }
- hlogFileSize = tempHlogFileSize;
-
numStoreFiles = tempNumStoreFiles;
memstoreSize = tempMemstoreSize;
storeFileSize = tempStoreFileSize;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 7d291bf..06e51c6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -149,9 +149,9 @@ import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
-import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes;
@@ -687,13 +687,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* @throws IOException
*/
private OperationStatus [] doReplayBatchOp(final HRegion region,
- final List mutations, long replaySeqId) throws IOException {
+ final List mutations, long replaySeqId) throws IOException {
long before = EnvironmentEdgeManager.currentTime();
boolean batchContainsPuts = false, batchContainsDelete = false;
try {
- for (Iterator it = mutations.iterator(); it.hasNext();) {
- HLogSplitter.MutationReplay m = it.next();
+ for (Iterator it = mutations.iterator(); it.hasNext();) {
+ WALSplitter.MutationReplay m = it.next();
if (m.type == MutationType.PUT) {
batchContainsPuts = true;
@@ -718,7 +718,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
regionServer.cacheFlusher.reclaimMemStoreMemory();
}
return region.batchReplay(mutations.toArray(
- new HLogSplitter.MutationReplay[mutations.size()]), replaySeqId);
+ new WALSplitter.MutationReplay[mutations.size()]), replaySeqId);
} finally {
if (regionServer.metricsRegionServer != null) {
long after = EnvironmentEdgeManager.currentTime();
@@ -1090,10 +1090,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
return builder.build();
} catch (DroppedSnapshotException ex) {
// Cache flush can fail in a few places. If it fails in a critical
- // section, we get a DroppedSnapshotException and a replay of hlog
+ // section, we get a DroppedSnapshotException and a replay of wal
// is required. Currently the only way to do this is a restart of
// the server.
- regionServer.abort("Replay of HLog required. Forcing server shutdown", ex);
+ regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
throw new ServiceException(ex);
} catch (IOException ie) {
throw new ServiceException(ie);
@@ -1444,7 +1444,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
HRegion region = regionServer.getRegionByEncodedName(
entries.get(0).getKey().getEncodedRegionName().toStringUtf8());
RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
- List> walEntries = new ArrayList>();
+ List> walEntries = new ArrayList>();
for (WALEntry entry : entries) {
if (regionServer.nonceManager != null) {
long nonceGroup = entry.getKey().hasNonceGroup()
@@ -1452,9 +1452,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
regionServer.nonceManager.reportOperationFromWal(nonceGroup, nonce, entry.getKey().getWriteTime());
}
- Pair walEntry = (coprocessorHost == null) ? null :
- new Pair();
- List edits = HLogSplitter.getMutationsFromWALEntry(entry,
+ Pair walEntry = (coprocessorHost == null) ? null :
+ new Pair();
+ List edits = WALSplitter.getMutationsFromWALEntry(entry,
cells, walEntry);
if (coprocessorHost != null) {
// Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
@@ -1483,7 +1483,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
region.syncWal();
if (coprocessorHost != null) {
- for (Pair wal : walEntries) {
+ for (Pair wal : walEntries) {
coprocessorHost.postWALRestore(region.getRegionInfo(), wal.getFirst(),
wal.getSecond());
}
@@ -1536,14 +1536,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
checkOpen();
requestCount.increment();
regionServer.getRegionServerCoprocessorHost().preRollWALWriterRequest();
- HLog wal = regionServer.getWAL();
- byte[][] regionsToFlush = wal.rollWriter(true);
+ regionServer.walRoller.requestRollAll();
+ regionServer.getRegionServerCoprocessorHost().postRollWALWriterRequest();
RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder();
- if (regionsToFlush != null) {
- for (byte[] region: regionsToFlush) {
- builder.addRegionToFlush(ByteStringer.wrap(region));
- }
- }
return builder.build();
} catch (IOException ie) {
throw new ServiceException(ie);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index d0362c8..bff2705 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -74,6 +74,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@@ -109,6 +110,8 @@ public class RegionCoprocessorHost
private static final int LATENCY_BUFFER_SIZE = 100;
private final BlockingQueue coprocessorTimeNanos = new ArrayBlockingQueue(
LATENCY_BUFFER_SIZE);
+ private final boolean useLegacyPre;
+ private final boolean useLegacyPost;
/**
* Constructor
@@ -122,6 +125,14 @@ public class RegionCoprocessorHost
this.region = region;
this.rsServices = services;
this.sharedData = sharedData;
+ // Pick which version of the WAL related events we'll call.
+ // This way we avoid calling the new version on older RegionObservers so
+ // we can maintain binary compatibility.
+ // See notes in javadoc for RegionObserver
+ useLegacyPre = useLegacyMethod(impl.getClass(), "preWALRestore", ObserverContext.class,
+ HRegionInfo.class, WALKey.class, WALEdit.class);
+ useLegacyPost = useLegacyMethod(impl.getClass(), "postWALRestore", ObserverContext.class,
+ HRegionInfo.class, WALKey.class, WALEdit.class);
}
/** @return the region */
@@ -1309,35 +1320,76 @@ public class RegionCoprocessorHost
* @return true if default behavior should be bypassed, false otherwise
* @throws IOException
*/
- public boolean preWALRestore(final HRegionInfo info, final HLogKey logKey,
+ public boolean preWALRestore(final HRegionInfo info, final WALKey logKey,
final WALEdit logEdit) throws IOException {
return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
@Override
public void call(RegionObserver oserver, ObserverContext ctx)
throws IOException {
- oserver.preWALRestore(ctx, info, logKey, logEdit);
+ // Once we don't need to support the legacy call, replace RegionOperation with a version
+ // that's ObserverContext and avoid this cast.
+ final RegionEnvironment env = (RegionEnvironment)ctx.getEnvironment();
+ if (env.useLegacyPre) {
+ if (logKey instanceof HLogKey) {
+ oserver.preWALRestore(ctx, info, (HLogKey)logKey, logEdit);
+ } else {
+ legacyWarning(oserver.getClass(), "There are wal keys present that are not HLogKey.");
+ }
+ } else {
+ oserver.preWALRestore(ctx, info, logKey, logEdit);
+ }
}
});
}
/**
+ * @return true if default behavior should be bypassed, false otherwise
+ * @deprecated use {@link #preWALRestore(HRegionInfo, WALKey, WALEdit)}
+ */
+ @Deprecated
+ public boolean preWALRestore(final HRegionInfo info, final HLogKey logKey,
+ final WALEdit logEdit) throws IOException {
+ return preWALRestore(info, (WALKey)logKey, logEdit);
+ }
+
+ /**
* @param info
* @param logKey
* @param logEdit
* @throws IOException
*/
- public void postWALRestore(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit)
+ public void postWALRestore(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit)
throws IOException {
execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
@Override
public void call(RegionObserver oserver, ObserverContext ctx)
throws IOException {
- oserver.postWALRestore(ctx, info, logKey, logEdit);
+ // Once we don't need to support the legacy call, replace RegionOperation with a version
+ // that's ObserverContext and avoid this cast.
+ final RegionEnvironment env = (RegionEnvironment)ctx.getEnvironment();
+ if (env.useLegacyPost) {
+ if (logKey instanceof HLogKey) {
+ oserver.postWALRestore(ctx, info, (HLogKey)logKey, logEdit);
+ } else {
+ legacyWarning(oserver.getClass(), "There are wal keys present that are not HLogKey.");
+ }
+ } else {
+ oserver.postWALRestore(ctx, info, logKey, logEdit);
+ }
}
});
}
/**
+ * @deprecated use {@link #postWALRestore(HRegionInfo, WALKey, WALEdit)}
+ */
+ @Deprecated
+ public void postWALRestore(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit)
+ throws IOException {
+ postWALRestore(info, (WALKey)logKey, logEdit);
+ }
+
+ /**
* @param familyPaths pairs of { CF, file path } submitted for bulk load
* @return true if the default operation should be bypassed
* @throws IOException
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
index 479aced..879b573 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
@@ -34,7 +34,7 @@ public class RegionServerAccounting {
private final AtomicLong atomicGlobalMemstoreSize = new AtomicLong(0);
- // Store the edits size during replaying HLog. Use this to roll back the
+ // Store the edits size during replaying WAL. Use this to roll back the
// global memstore size once a region opening failed.
private final ConcurrentMap replayEditsPerRegion =
new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
index e8b953e..5ea630e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.wal.WAL;
import org.apache.zookeeper.KeeperException;
/**
@@ -45,9 +45,9 @@ public interface RegionServerServices
*/
boolean isStopping();
- /** @return the HLog for a particular region. Pass null for getting the
+ /** @return the WAL for a particular region. Pass null for getting the
* default (common) WAL */
- HLog getWAL(HRegionInfo regionInfo) throws IOException;
+ WAL getWAL(HRegionInfo regionInfo) throws IOException;
/**
* @return Implementation of {@link CompactionRequestor} or null.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
index 335422c..eeffa8b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
@@ -35,7 +35,8 @@ import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
-import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.FSUtils;
@@ -69,6 +70,7 @@ public class SplitLogWorker implements Runnable {
private SplitLogWorkerCoordination coordination;
private Configuration conf;
private RegionServerServices server;
+
public SplitLogWorker(Server hserver, Configuration conf, RegionServerServices server,
TaskExecutor splitTaskExecutor) {
this.server = server;
@@ -81,7 +83,8 @@ public class SplitLogWorker implements Runnable {
}
public SplitLogWorker(final Server hserver, final Configuration conf,
- final RegionServerServices server, final LastSequenceId sequenceIdChecker) {
+ final RegionServerServices server, final LastSequenceId sequenceIdChecker,
+ final WALFactory factory) {
this(server, conf, server, new TaskExecutor() {
@Override
public Status exec(String filename, RecoveryMode mode, CancelableProgressable p) {
@@ -98,8 +101,8 @@ public class SplitLogWorker implements Runnable {
// interrupted or has encountered a transient error and when it has
// encountered a bad non-retry-able persistent error.
try {
- if (!HLogSplitter.splitLogFile(rootdir, fs.getFileStatus(new Path(rootdir, filename)),
- fs, conf, p, sequenceIdChecker, server.getCoordinatedStateManager(), mode)) {
+ if (!WALSplitter.splitLogFile(rootdir, fs.getFileStatus(new Path(rootdir, filename)),
+ fs, conf, p, sequenceIdChecker, server.getCoordinatedStateManager(), mode, factory)) {
return Status.PREEMPTED;
}
} catch (InterruptedIOException iioe) {
@@ -152,6 +155,7 @@ public class SplitLogWorker implements Runnable {
LOG.info("SplitLogWorker " + server.getServerName() + " exiting");
}
}
+
/**
* If the worker is doing a task i.e. splitting a log file then stop the task.
* It doesn't exit the worker thread.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java
deleted file mode 100644
index 8e2ee62..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver.handler;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.SplitLogCounters;
-import org.apache.hadoop.hbase.SplitLogTask;
-import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
-import org.apache.hadoop.hbase.executor.EventHandler;
-import org.apache.hadoop.hbase.executor.EventType;
-import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
-import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor;
-import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor.Status;
-import org.apache.hadoop.hbase.util.CancelableProgressable;
-
-/**
- * Handles log splitting a wal
- */
-@InterfaceAudience.Private
-public class HLogSplitterHandler extends EventHandler {
- private static final Log LOG = LogFactory.getLog(HLogSplitterHandler.class);
- private final ServerName serverName;
- private final CancelableProgressable reporter;
- private final AtomicInteger inProgressTasks;
- private final TaskExecutor splitTaskExecutor;
- private final RecoveryMode mode;
- private final SplitLogWorkerCoordination.SplitTaskDetails splitTaskDetails;
- private final SplitLogWorkerCoordination coordination;
-
-
- public HLogSplitterHandler(final Server server, SplitLogWorkerCoordination coordination,
- SplitLogWorkerCoordination.SplitTaskDetails splitDetails, CancelableProgressable reporter,
- AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor, RecoveryMode mode) {
- super(server, EventType.RS_LOG_REPLAY);
- this.splitTaskDetails = splitDetails;
- this.coordination = coordination;
- this.reporter = reporter;
- this.inProgressTasks = inProgressTasks;
- this.inProgressTasks.incrementAndGet();
- this.serverName = server.getServerName();
- this.splitTaskExecutor = splitTaskExecutor;
- this.mode = mode;
- }
-
- @Override
- public void process() throws IOException {
- long startTime = System.currentTimeMillis();
- try {
- Status status = this.splitTaskExecutor.exec(splitTaskDetails.getWALFile(), mode, reporter);
- switch (status) {
- case DONE:
- coordination.endTask(new SplitLogTask.Done(this.serverName,this.mode),
- SplitLogCounters.tot_wkr_task_done, splitTaskDetails);
- break;
- case PREEMPTED:
- SplitLogCounters.tot_wkr_preempt_task.incrementAndGet();
- LOG.warn("task execution prempted " + splitTaskDetails.getWALFile());
- break;
- case ERR:
- if (server != null && !server.isStopped()) {
- coordination.endTask(new SplitLogTask.Err(this.serverName, this.mode),
- SplitLogCounters.tot_wkr_task_err, splitTaskDetails);
- break;
- }
- // if the RS is exiting then there is probably a tons of stuff
- // that can go wrong. Resign instead of signaling error.
- //$FALL-THROUGH$
- case RESIGNED:
- if (server != null && server.isStopped()) {
- LOG.info("task execution interrupted because worker is exiting "
- + splitTaskDetails.toString());
- }
- coordination.endTask(new SplitLogTask.Resigned(this.serverName, this.mode),
- SplitLogCounters.tot_wkr_task_resigned, splitTaskDetails);
- break;
- }
- } finally {
- LOG.info("worker " + serverName + " done with task " + splitTaskDetails.toString() + " in "
- + (System.currentTimeMillis() - startTime) + "ms");
- this.inProgressTasks.decrementAndGet();
- }
- }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/WALSplitterHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/WALSplitterHandler.java
new file mode 100644
index 0000000..9a03192
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/WALSplitterHandler.java
@@ -0,0 +1,106 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.handler;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.SplitLogCounters;
+import org.apache.hadoop.hbase.SplitLogTask;
+import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
+import org.apache.hadoop.hbase.executor.EventHandler;
+import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
+import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor;
+import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor.Status;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+
+/**
+ * Handles log splitting a wal
+ */
+@InterfaceAudience.Private
+public class WALSplitterHandler extends EventHandler {
+ private static final Log LOG = LogFactory.getLog(WALSplitterHandler.class);
+ private final ServerName serverName;
+ private final CancelableProgressable reporter;
+ private final AtomicInteger inProgressTasks;
+ private final TaskExecutor splitTaskExecutor;
+ private final RecoveryMode mode;
+ private final SplitLogWorkerCoordination.SplitTaskDetails splitTaskDetails;
+ private final SplitLogWorkerCoordination coordination;
+
+
+ public WALSplitterHandler(final Server server, SplitLogWorkerCoordination coordination,
+ SplitLogWorkerCoordination.SplitTaskDetails splitDetails, CancelableProgressable reporter,
+ AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor, RecoveryMode mode) {
+ super(server, EventType.RS_LOG_REPLAY);
+ this.splitTaskDetails = splitDetails;
+ this.coordination = coordination;
+ this.reporter = reporter;
+ this.inProgressTasks = inProgressTasks;
+ this.inProgressTasks.incrementAndGet();
+ this.serverName = server.getServerName();
+ this.splitTaskExecutor = splitTaskExecutor;
+ this.mode = mode;
+ }
+
+ @Override
+ public void process() throws IOException {
+ long startTime = System.currentTimeMillis();
+ try {
+ Status status = this.splitTaskExecutor.exec(splitTaskDetails.getWALFile(), mode, reporter);
+ switch (status) {
+ case DONE:
+ coordination.endTask(new SplitLogTask.Done(this.serverName,this.mode),
+ SplitLogCounters.tot_wkr_task_done, splitTaskDetails);
+ break;
+ case PREEMPTED:
+ SplitLogCounters.tot_wkr_preempt_task.incrementAndGet();
+ LOG.warn("task execution prempted " + splitTaskDetails.getWALFile());
+ break;
+ case ERR:
+ if (server != null && !server.isStopped()) {
+ coordination.endTask(new SplitLogTask.Err(this.serverName, this.mode),
+ SplitLogCounters.tot_wkr_task_err, splitTaskDetails);
+ break;
+ }
+ // if the RS is exiting then there is probably a tons of stuff
+ // that can go wrong. Resign instead of signaling error.
+ //$FALL-THROUGH$
+ case RESIGNED:
+ if (server != null && server.isStopped()) {
+ LOG.info("task execution interrupted because worker is exiting "
+ + splitTaskDetails.toString());
+ }
+ coordination.endTask(new SplitLogTask.Resigned(this.serverName, this.mode),
+ SplitLogCounters.tot_wkr_task_resigned, splitTaskDetails);
+ break;
+ }
+ } finally {
+ LOG.info("worker " + serverName + " done with task " + splitTaskDetails.toString() + " in "
+ + (System.currentTimeMillis() - startTime) + "ms");
+ this.inProgressTasks.decrementAndGet();
+ }
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
index d8da412..12af619 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
@@ -26,17 +26,18 @@ import org.apache.hadoop.hbase.io.TagCompressionContext;
import org.apache.hadoop.hbase.io.util.Dictionary;
/**
- * Context that holds the various dictionaries for compression in HLog.
+ * Context that holds the various dictionaries for compression in WAL.
*/
@InterfaceAudience.Private
-class CompressionContext {
+public class CompressionContext {
static final String ENABLE_WAL_TAGS_COMPRESSION =
"hbase.regionserver.wal.tags.enablecompression";
- final Dictionary regionDict;
- final Dictionary tableDict;
- final Dictionary familyDict;
+ // visible only for WALKey, until we move everything into o.a.h.h.wal
+ public final Dictionary regionDict;
+ public final Dictionary tableDict;
+ public final Dictionary familyDict;
final Dictionary qualifierDict;
final Dictionary rowDict;
// Context used for compressing tags
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
index b75a7cf..4032cde 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
@@ -33,9 +33,13 @@ import org.apache.hadoop.io.WritableUtils;
import com.google.common.base.Preconditions;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALProvider;
+
/**
* A set of static functions for running our custom WAL compression/decompression.
- * Also contains a command line tool to compress and uncompress HLogs.
+ * Also contains a command line tool to compress and uncompress WALs.
*/
@InterfaceAudience.Private
public class Compressor {
@@ -56,8 +60,8 @@ public class Compressor {
private static void printHelp() {
System.err.println("usage: Compressor