From 31497605b04d94a210b791a1cc0e54bf552b240f Mon Sep 17 00:00:00 2001 From: John Zhao Date: Wed, 2 Jan 2019 10:12:05 +0800 Subject: [PATCH] KYLIN-1575 Timeline-consistent High Available Reads for metadata table --- .../org/apache/kylin/common/KylinConfigBase.java | 9 +++++++++ .../src/main/resources/kylin-defaults.properties | 3 +++ .../kylin/storage/hbase/HBaseConnection.java | 14 ++++++++++++-- .../kylin/storage/hbase/HBaseResourceStore.java | 9 +++++++++ 4 files changed, 33 insertions(+), 2 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index b63062e31a..615ebccdec 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1929,4 +1929,13 @@ public int getJdbcResourceStoreMaxCellSize() { public String getJdbcSourceAdaptor() { return getOptional("kylin.source.jdbc.adaptor"); } + + public int getHBaseResourceStoreReplication() { + try { + return Integer.parseInt(getOptional("kylin.metadata.hbase-replication", "1")); + } catch (NumberFormatException e) { + logger.error("Invalid number of 'kylin.metadata.hbase-replication'", e); + return 1; + } + } } diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties index 6f2db9a903..dd667b4791 100644 --- a/core-common/src/main/resources/kylin-defaults.properties +++ b/core-common/src/main/resources/kylin-defaults.properties @@ -23,6 +23,9 @@ kylin.metadata.url=kylin_metadata@hbase # metadata cache sync retry times kylin.metadata.sync-retries=3 +# metadata replica in hbase region servers +kylin.metadata.hbase-replication=1 + # Working folder in HDFS, better be qualified absolute path, make sure user has the right permission to this directory kylin.env.hdfs-working-dir=/kylin diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java index 53e8a686f7..d34d5c7c22 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java @@ -300,11 +300,12 @@ public static void createHTableIfNeeded(Connection conn, String table, String... TableName tableName = TableName.valueOf(table); DistributedLock lock = null; String lockPath = getLockPath(table); - + int replication = KylinConfig.getInstanceFromEnv().getHBaseResourceStoreReplication(); try { if (tableExists(conn, table)) { logger.debug("HTable '" + table + "' already exists"); - Set existingFamilies = getFamilyNames(admin.getTableDescriptor(tableName)); + HTableDescriptor hTableDescriptor = admin.getTableDescriptor(tableName); + Set existingFamilies = getFamilyNames(hTableDescriptor); boolean wait = false; for (String family : families) { if (existingFamilies.contains(family) == false) { @@ -321,6 +322,14 @@ public static void createHTableIfNeeded(Connection conn, String table, String... logger.warn("", e); } } + + if (hTableDescriptor.getRegionReplication() != replication) { + logger.debug("Update HTable '" + table + "' replication to {}", replication); + hTableDescriptor.setRegionReplication(replication); + admin.disableTable(tableName); + admin.modifyTable(tableName, hTableDescriptor); + admin.enableTable(tableName); + } return; } @@ -344,6 +353,7 @@ public static void createHTableIfNeeded(Connection conn, String table, String... } } + desc.setRegionReplication(replication); admin.createTable(desc); logger.debug("HTable '" + table + "' created"); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java index 14c5ea7cf0..cb80aa6aa6 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Consistency; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; @@ -183,6 +184,7 @@ private void visitFolder(String folderPath, VisitFilter filter, boolean loadCont Table table = getConnection().getTable(TableName.valueOf(tableName)); Scan scan = new Scan(startRow, endRow); + scan.setConsistency(Consistency.TIMELINE); scan.addColumn(B_FAMILY, B_COLUMN_TS); if (loadContent) { scan.addColumn(B_FAMILY, B_COLUMN); @@ -199,6 +201,9 @@ private void visitFolder(String folderPath, VisitFilter filter, boolean loadCont for (Result r : scanner) { String path = Bytes.toString(r.getRow()); assert path.startsWith(lookForPrefix); + if (r.isStale()) { + logger.warn("Kylin reads stale data at: {}", path); + } int cut = path.indexOf('/', folderPrefix.length()); String directChild = cut < 0 ? path : path.substring(0, cut); visitor.visit(directChild, path, r); @@ -395,6 +400,7 @@ private Result internalGetFromHTable(Table table, String path, boolean fetchCont byte[] rowkey = Bytes.toBytes(path); Get get = new Get(rowkey); + get.setConsistency(Consistency.TIMELINE); if (!fetchContent && !fetchTimestamp) { get.setCheckExistenceOnly(true); @@ -406,6 +412,9 @@ private Result internalGetFromHTable(Table table, String path, boolean fetchCont } Result result = table.get(get); + if (result.isStale()) { + logger.warn("Kylin reads stale data at: {}", path); + } boolean exists = result != null && (!result.isEmpty() || (result.getExists() != null && result.getExists())); return exists ? result : null; }