From d42d62b5ae8e93132eb38e8614e598075298ebf6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=B0=8F=E4=BF=9D?= Date: Thu, 29 Nov 2018 10:46:18 +0800 Subject: [PATCH] improve close method --- .../org/apache/hadoop/hbase/client/HTable.java | 35 +++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index fb69a2530b..820743fbc4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -274,6 +274,7 @@ public class HTable implements Table { private Pair, List> getKeysAndRegionsInRange( final byte[] startKey, final byte[] endKey, final boolean includeEndKey, final boolean reload) throws IOException { + checkClose(); final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW); if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) { throw new IllegalArgumentException( @@ -300,6 +301,7 @@ public class HTable implements Table { */ @Override public ResultScanner getScanner(Scan scan) throws IOException { + checkClose(); if (scan.getCaching() <= 0) { scan.setCaching(scannerCaching); } @@ -362,6 +364,7 @@ public class HTable implements Table { private Result get(Get get, final boolean checkExistenceOnly) throws IOException { // if we are changing settings to the get, clone it. + checkClose(); if (get.isCheckExistenceOnly() != checkExistenceOnly || get.getConsistency() == null) { get = ReflectionUtils.newInstance(get.getClass(), get); get.setCheckExistenceOnly(checkExistenceOnly); @@ -397,6 +400,7 @@ public class HTable implements Table { @Override public Result[] get(List gets) throws IOException { + checkClose(); if (gets.size() == 1) { return new Result[]{get(gets.get(0))}; } @@ -440,6 +444,7 @@ public class HTable implements Table { public void batch(final List actions, final Object[] results, int rpcTimeout) throws InterruptedException, IOException { + checkClose(); AsyncProcessTask task = AsyncProcessTask.newBuilder() .setPool(pool) .setTableName(tableName) @@ -460,6 +465,7 @@ public class HTable implements Table { public void batchCallback( final List actions, final Object[] results, final Batch.Callback callback) throws IOException, InterruptedException { + checkClose(); doBatchWithCallback(actions, results, callback, connection, pool, tableName); } @@ -488,6 +494,7 @@ public class HTable implements Table { @Override public void delete(final Delete delete) throws IOException { + checkClose(); ClientServiceCallable callable = new ClientServiceCallable(this.connection, getName(), delete.getRow(), this.rpcControllerFactory.newController(), delete.getPriority()) { @@ -506,6 +513,7 @@ public class HTable implements Table { @Override public void delete(final List deletes) throws IOException { + checkClose(); Object[] results = new Object[deletes.size()]; try { batch(deletes, results, writeRpcTimeoutMs); @@ -527,6 +535,7 @@ public class HTable implements Table { @Override public void put(final Put put) throws IOException { + checkClose(); validatePut(put); ClientServiceCallable callable = new ClientServiceCallable(this.connection, getName(), put.getRow(), @@ -558,6 +567,7 @@ public class HTable implements Table { @Override public void mutateRow(final RowMutations rm) throws IOException { + checkClose(); CancellableRegionServerCallable callable = new CancellableRegionServerCallable(this.connection, getName(), rm.getRow(), rpcControllerFactory.newController(), writeRpcTimeoutMs, @@ -599,6 +609,7 @@ public class HTable implements Table { @Override public Result append(final Append append) throws IOException { + checkClose(); checkHasFamilies(append); NoncedRegionServerCallable callable = new NoncedRegionServerCallable(this.connection, getName(), append.getRow(), @@ -618,6 +629,7 @@ public class HTable implements Table { @Override public Result increment(final Increment increment) throws IOException { + checkClose(); checkHasFamilies(increment); NoncedRegionServerCallable callable = new NoncedRegionServerCallable(this.connection, getName(), increment.getRow(), @@ -646,6 +658,7 @@ public class HTable implements Table { public long incrementColumnValue(final byte [] row, final byte [] family, final byte [] qualifier, final long amount, final Durability durability) throws IOException { + checkClose(); NullPointerException npe = null; if (row == null) { npe = new NullPointerException("row is null"); @@ -700,6 +713,7 @@ public class HTable implements Table { private boolean doCheckAndPut(final byte[] row, final byte[] family, final byte[] qualifier, final String opName, final byte[] value, final TimeRange timeRange, final Put put) throws IOException { + checkClose(); ClientServiceCallable callable = new ClientServiceCallable(this.connection, getName(), row, this.rpcControllerFactory.newController(), put.getPriority()) { @@ -742,6 +756,7 @@ public class HTable implements Table { private boolean doCheckAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, final String opName, final byte[] value, final TimeRange timeRange, final Delete delete) throws IOException { + checkClose(); CancellableRegionServerCallable callable = new CancellableRegionServerCallable(this.connection, getName(), row, this.rpcControllerFactory.newController(), writeRpcTimeoutMs, @@ -775,12 +790,14 @@ public class HTable implements Table { @Override public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { + checkClose(); return new CheckAndMutateBuilderImpl(row, family); } private boolean doCheckAndMutate(final byte[] row, final byte[] family, final byte[] qualifier, final String opName, final byte[] value, final TimeRange timeRange, final RowMutations rm) throws IOException { + checkClose(); CancellableRegionServerCallable callable = new CancellableRegionServerCallable(connection, getName(), rm.getRow(), rpcControllerFactory.newController(), writeRpcTimeoutMs, new RetryingTimeTracker().start(), @@ -959,6 +976,7 @@ public class HTable implements Table { @Override public CoprocessorRpcChannel coprocessorService(byte[] row) { + checkClose(); return new RegionCoprocessorRpcChannel(connection, tableName, row); } @@ -966,6 +984,7 @@ public class HTable implements Table { public Map coprocessorService(final Class service, byte[] startKey, byte[] endKey, final Batch.Call callable) throws ServiceException, Throwable { + checkClose(); final Map results = Collections.synchronizedMap( new TreeMap(Bytes.BYTES_COMPARATOR)); coprocessorService(service, startKey, endKey, callable, new Batch.Callback() { @@ -1107,6 +1126,7 @@ public class HTable implements Table { public Map batchCoprocessorService( Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable { + checkClose(); final Map results = Collections.synchronizedMap(new TreeMap( Bytes.BYTES_COMPARATOR)); batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype, @@ -1121,12 +1141,25 @@ public class HTable implements Table { return results; } + private void checkClose() { + if (isClosed()) { + throw new IllegalStateException("Table=" + tableName.getName() + + " already closed"); + } + } + + + public boolean isClosed() { + return closed; + } + + @Override public void batchCoprocessorService( final Descriptors.MethodDescriptor methodDescriptor, final Message request, byte[] startKey, byte[] endKey, final R responsePrototype, final Callback callback) throws ServiceException, Throwable { - + checkClose(); if (startKey == null) { startKey = HConstants.EMPTY_START_ROW; } -- 2.14.1