From ca7a61b2fdbfc321e7c896c3d34fd4d4c48e3eed Mon Sep 17 00:00:00 2001 From: ulysses <646303253@qq.com> Date: Tue, 14 Aug 2018 14:13:03 +0800 Subject: [PATCH] Update HBaseResourceStore.java Avoid resource inconsistent caused by hbase rpc timeout --- .../storage/hbase/HBaseResourceStore.java | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java index 23df556beb..2c7494e0a7 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; @@ -317,13 +318,20 @@ protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldT byte[] bOldTS = oldTS == 0 ? null : Bytes.toBytes(oldTS); Put put = buildPut(resPath, newTS, row, content, table); - boolean ok = table.checkAndPut(row, B_FAMILY, B_COLUMN_TS, bOldTS, put); - logger.trace("Update row " + resPath + " from oldTs: " + oldTS + ", to newTs: " + newTS - + ", operation result: " + ok); - if (!ok) { + try { + boolean ok = table.checkAndPut(row, B_FAMILY, B_COLUMN_TS, bOldTS, put); + logger.trace("Update row " + resPath + " from oldTs: " + oldTS + ", to newTs: " + newTS + ", operation result: " + ok); + if (!ok) { + long real = getResourceTimestampImpl(resPath); + throw new WriteConflictException( + "Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but it is " + real); + } + } catch (RetriesExhaustedException e){ long real = getResourceTimestampImpl(resPath); - throw new WriteConflictException( - "Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but it is " + real); + // rpc timeout but resource has been already updated + if(newTS != real){ + throw e; + } } return newTS;