diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java index 85694db..28c1829 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.coprocessor.*; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; +import org.apache.hadoop.hbase.util.ObjectReference; import java.io.IOException; import java.util.List; @@ -80,1598 +81,780 @@ public class MasterCoprocessorHost } public boolean preCreateNamespace(final NamespaceDescriptor ns) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preCreateNamespace(ctx, ns); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preCreateNamespace(ctx, ns); } - } - return bypass; + }); } public void postCreateNamespace(final NamespaceDescriptor ns) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postCreateNamespace(ctx, ns); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postCreateNamespace(ctx, ns); } - } + }); } public boolean preDeleteNamespace(final String namespaceName) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preDeleteNamespace(ctx, namespaceName); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preDeleteNamespace(ctx, namespaceName); } - } - return bypass; + }); } public void postDeleteNamespace(final String namespaceName) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postDeleteNamespace(ctx, namespaceName); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postDeleteNamespace(ctx, namespaceName); } - } + }); } public boolean preModifyNamespace(final NamespaceDescriptor ns) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preModifyNamespace(ctx, ns); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preModifyNamespace(ctx, ns); } - } - return bypass; + }); } public void postModifyNamespace(final NamespaceDescriptor ns) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postModifyNamespace(ctx, ns); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postModifyNamespace(ctx, ns); } - } + }); } /* Implementation of hooks for invoking MasterObservers */ public void preCreateTable(final HTableDescriptor htd, final HRegionInfo[] regions) - throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preCreateTable(ctx, htd, regions); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + throws IOException { + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preCreateTable(ctx, htd, regions); } - } + }); } public void postCreateTable(final HTableDescriptor htd, final HRegionInfo[] regions) - throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postCreateTable(ctx, htd, regions); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + throws IOException { + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postCreateTable(ctx, htd, regions); } - } + }); } public void preCreateTableHandler(final HTableDescriptor htd, final HRegionInfo[] regions) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).preCreateTableHandler(ctx, htd, regions); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preCreateTableHandler(ctx, htd, regions); } - } + }); } public void postCreateTableHandler(final HTableDescriptor htd, final HRegionInfo[] regions) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).postCreateTableHandler(ctx, htd, regions); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postCreateTableHandler(ctx, htd, regions); } - } + }); } public void preDeleteTable(final TableName tableName) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preDeleteTable(ctx, tableName); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preDeleteTable(ctx, tableName); } - } + }); } public void postDeleteTable(final TableName tableName) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postDeleteTable(ctx, tableName); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postDeleteTable(ctx, tableName); } - } + }); } public void preDeleteTableHandler(final TableName tableName) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).preDeleteTableHandler(ctx, tableName); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preDeleteTableHandler(ctx, tableName); } - } + }); } public void postDeleteTableHandler(final TableName tableName) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).postDeleteTableHandler(ctx, tableName); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postDeleteTableHandler(ctx, tableName); } - } + }); } - public void preTruncateTable(TableName tableName) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - try { - ((MasterObserver)env.getInstance()).preTruncateTable(ctx, tableName); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } - if (ctx.shouldComplete()) { - break; - } + public void preTruncateTable(final TableName tableName) throws IOException { + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preTruncateTable(ctx, tableName); } - } + }); } - public void postTruncateTable(TableName tableName) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - try { - ((MasterObserver)env.getInstance()).postTruncateTable(ctx, tableName); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } - if (ctx.shouldComplete()) { - break; - } + public void postTruncateTable(final TableName tableName) throws IOException { + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postTruncateTable(ctx, tableName); } - } + }); } - public void preTruncateTableHandler(TableName tableName) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - try { - ((MasterObserver) env.getInstance()).preTruncateTableHandler(ctx, tableName); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } - if (ctx.shouldComplete()) { - break; - } + public void preTruncateTableHandler(final TableName tableName) throws IOException { + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preTruncateTableHandler(ctx, tableName); } - } + }); } - public void postTruncateTableHandler(TableName tableName) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - try { - ((MasterObserver) env.getInstance()).postTruncateTableHandler(ctx, tableName); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } - if (ctx.shouldComplete()) { - break; - } + public void postTruncateTableHandler(final TableName tableName) throws IOException { + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postTruncateTableHandler(ctx, tableName); } - } + }); } public void preModifyTable(final TableName tableName, final HTableDescriptor htd) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preModifyTable(ctx, tableName, htd); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preModifyTable(ctx, tableName, htd); } - } + }); } public void postModifyTable(final TableName tableName, final HTableDescriptor htd) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postModifyTable(ctx, tableName, htd); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postModifyTable(ctx, tableName, htd); } - } + }); } public void preModifyTableHandler(final TableName tableName, final HTableDescriptor htd) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).preModifyTableHandler(ctx, tableName, htd); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preModifyTableHandler(ctx, tableName, htd); } - } + }); } public void postModifyTableHandler(final TableName tableName, final HTableDescriptor htd) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).postModifyTableHandler(ctx, tableName, htd); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postModifyTableHandler(ctx, tableName, htd); } - } + }); } public boolean preAddColumn(final TableName tableName, final HColumnDescriptor column) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preAddColumn(ctx, tableName, column); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preAddColumn(ctx, tableName, column); } - } - return bypass; + }); } public void postAddColumn(final TableName tableName, final HColumnDescriptor column) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postAddColumn(ctx, tableName, column); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postAddColumn(ctx, tableName, column); } - } + }); } public boolean preAddColumnHandler(final TableName tableName, final HColumnDescriptor column) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).preAddColumnHandler(ctx, tableName, column); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preAddColumnHandler(ctx, tableName, column); } - } - return bypass; + }); } public void postAddColumnHandler(final TableName tableName, final HColumnDescriptor column) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).postAddColumnHandler(ctx, tableName, column); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postAddColumnHandler(ctx, tableName, column); } - } + }); } public boolean preModifyColumn(final TableName tableName, final HColumnDescriptor descriptor) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preModifyColumn(ctx, tableName, descriptor); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preModifyColumn(ctx, tableName, descriptor); } - } - return bypass; + }); } public void postModifyColumn(final TableName tableName, final HColumnDescriptor descriptor) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postModifyColumn(ctx, tableName, descriptor); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postModifyColumn(ctx, tableName, descriptor); } - } + }); } public boolean preModifyColumnHandler(final TableName tableName, final HColumnDescriptor descriptor) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).preModifyColumnHandler(ctx, tableName, descriptor); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preModifyColumnHandler(ctx, tableName, descriptor); } - } - return bypass; + }); } public void postModifyColumnHandler(final TableName tableName, final HColumnDescriptor descriptor) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).postModifyColumnHandler(ctx, tableName, descriptor); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postModifyColumnHandler(ctx, tableName, descriptor); } - } + }); } public boolean preDeleteColumn(final TableName tableName, final byte [] c) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preDeleteColumn(ctx, tableName, c); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preDeleteColumn(ctx, tableName, c); } - } - return bypass; + }); } public void postDeleteColumn(final TableName tableName, final byte [] c) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postDeleteColumn(ctx, tableName, c); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postDeleteColumn(ctx, tableName, c); } - } + }); } public boolean preDeleteColumnHandler(final TableName tableName, final byte[] c) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).preDeleteColumnHandler(ctx, tableName, c); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preDeleteColumnHandler(ctx, tableName, c); } - } - return bypass; + }); } public void postDeleteColumnHandler(final TableName tableName, final byte[] c) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).postDeleteColumnHandler(ctx, tableName, c); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postDeleteColumnHandler(ctx, tableName, c); } - } + }); } public void preEnableTable(final TableName tableName) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preEnableTable(ctx, tableName); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preEnableTable(ctx, tableName); } - } + }); } public void postEnableTable(final TableName tableName) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postEnableTable(ctx, tableName); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postEnableTable(ctx, tableName); } - } + }); } public void preEnableTableHandler(final TableName tableName) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).preEnableTableHandler(ctx, tableName); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preEnableTableHandler(ctx, tableName); } - } + }); } public void postEnableTableHandler(final TableName tableName) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).postEnableTableHandler(ctx, tableName); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postEnableTableHandler(ctx, tableName); } - } + }); } public void preDisableTable(final TableName tableName) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preDisableTable(ctx, tableName); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preDisableTable(ctx, tableName); } - } + }); } public void postDisableTable(final TableName tableName) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postDisableTable(ctx, tableName); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postDisableTable(ctx, tableName); } - } + }); } public void preDisableTableHandler(final TableName tableName) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).preDisableTableHandler(ctx, tableName); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preDisableTableHandler(ctx, tableName); } - } + }); } public void postDisableTableHandler(final TableName tableName) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).postDisableTableHandler(ctx, tableName); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postDisableTableHandler(ctx, tableName); } - } + }); } public boolean preMove(final HRegionInfo region, final ServerName srcServer, final ServerName destServer) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preMove(ctx, region, srcServer, destServer); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preMove(ctx, region, srcServer, destServer); } - } - return bypass; + }); } public void postMove(final HRegionInfo region, final ServerName srcServer, final ServerName destServer) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postMove(ctx, region, srcServer, destServer); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postMove(ctx, region, srcServer, destServer); } - } + }); } public boolean preAssign(final HRegionInfo regionInfo) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).preAssign(ctx, regionInfo); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preAssign(ctx, regionInfo); } - } - return bypass; + }); } public void postAssign(final HRegionInfo regionInfo) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postAssign(ctx, regionInfo); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postAssign(ctx, regionInfo); } - } + }); } public boolean preUnassign(final HRegionInfo regionInfo, final boolean force) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preUnassign(ctx, regionInfo, force); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preUnassign(ctx, regionInfo, force); } - } - return bypass; + }); } public void postUnassign(final HRegionInfo regionInfo, final boolean force) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postUnassign(ctx, regionInfo, force); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postUnassign(ctx, regionInfo, force); } - } + }); } public void preRegionOffline(final HRegionInfo regionInfo) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).preRegionOffline(ctx, regionInfo); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preRegionOffline(ctx, regionInfo); } - } + }); } public void postRegionOffline(final HRegionInfo regionInfo) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).postRegionOffline(ctx, regionInfo); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postRegionOffline(ctx, regionInfo); } - } + }); } public boolean preBalance() throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preBalance(ctx); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preBalance(ctx); } - } - return bypass; + }); } public void postBalance(final List plans) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postBalance(ctx, plans); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postBalance(ctx, plans); } - } + }); } public boolean preBalanceSwitch(final boolean b) throws IOException { - boolean balance = b; - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - balance = ((MasterObserver)env.getInstance()).preBalanceSwitch(ctx, balance); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + final ObjectReference res = new ObjectReference(b); + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + res.set(oserver.preBalanceSwitch(ctx, res.get())); } - } - return balance; + }); + return res.get(); } - void postBalanceSwitch(final boolean oldValue, final boolean newValue) + public void postBalanceSwitch(final boolean oldValue, final boolean newValue) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postBalanceSwitch(ctx, oldValue, newValue); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postBalanceSwitch(ctx, oldValue, newValue); } - } - } - - public void preShutdown() throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preShutdown(ctx); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + }); + } + + public void preShutdown() throws IOException { + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preShutdown(ctx); } - // invoke coprocessor stop method - shutdown(env); - } + @Override + public void postEnvCall(MasterEnvironment env) { + // invoke coprocessor stop method + shutdown(env); + } + }); } public void preStopMaster() throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preStopMaster(ctx); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preStopMaster(ctx); } - // invoke coprocessor stop method - shutdown(env); - } + @Override + public void postEnvCall(MasterEnvironment env) { + // invoke coprocessor stop method + shutdown(env); + } + }); } public void preMasterInitialization() throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).preMasterInitialization(ctx); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preMasterInitialization(ctx); } - } + }); } public void postStartMaster() throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postStartMaster(ctx); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postStartMaster(ctx); } - } + }); } public void preSnapshot(final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preSnapshot(ctx, snapshot, hTableDescriptor); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preSnapshot(ctx, snapshot, hTableDescriptor); } - } + }); } public void postSnapshot(final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postSnapshot(ctx, snapshot, hTableDescriptor); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postSnapshot(ctx, snapshot, hTableDescriptor); } - } + }); } public void preCloneSnapshot(final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preCloneSnapshot(ctx, snapshot, - hTableDescriptor); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preCloneSnapshot(ctx, snapshot, hTableDescriptor); } - } + }); } public void postCloneSnapshot(final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postCloneSnapshot(ctx, snapshot, - hTableDescriptor); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postCloneSnapshot(ctx, snapshot, hTableDescriptor); } - } + }); } public void preRestoreSnapshot(final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preRestoreSnapshot(ctx, snapshot, - hTableDescriptor); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preRestoreSnapshot(ctx, snapshot, hTableDescriptor); } - } + }); } public void postRestoreSnapshot(final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postRestoreSnapshot(ctx, snapshot, - hTableDescriptor); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postRestoreSnapshot(ctx, snapshot, hTableDescriptor); } - } + }); } public void preDeleteSnapshot(final SnapshotDescription snapshot) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preDeleteSnapshot(ctx, snapshot); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preDeleteSnapshot(ctx, snapshot); } - } + }); } public void postDeleteSnapshot(final SnapshotDescription snapshot) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postDeleteSnapshot(ctx, snapshot); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postDeleteSnapshot(ctx, snapshot); } - } + }); } public boolean preGetTableDescriptors(final List tableNamesList, final List descriptors) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).preGetTableDescriptors(ctx, - tableNamesList, descriptors); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preGetTableDescriptors(ctx, tableNamesList, descriptors); } - } - return bypass; + }); } public void postGetTableDescriptors(final List descriptors) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postGetTableDescriptors(ctx, descriptors); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postGetTableDescriptors(ctx, descriptors); } - } + }); } public void preTableFlush(final TableName tableName) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - try { - ((MasterObserver)env.getInstance()).preTableFlush(ctx, tableName); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preTableFlush(ctx, tableName); } - } + }); } public void postTableFlush(final TableName tableName) throws IOException { + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postTableFlush(ctx, tableName); + } + }); + } + + private abstract class CoprocessorOperation { + public abstract void call(MasterObserver oserver, + ObserverContext ctx) throws IOException; + + public void postEnvCall(MasterEnvironment env) { + } + } + + private boolean execOperation(final CoprocessorOperation operation) throws IOException { + boolean bypass = false; ObserverContext ctx = null; for (MasterEnvironment env: coprocessors) { if (env.getInstance() instanceof MasterObserver) { ctx = ObserverContext.createAndPrepare(env, ctx); + Thread currentThread = Thread.currentThread(); + ClassLoader cl = currentThread.getContextClassLoader(); try { - ((MasterObserver)env.getInstance()).postTableFlush(ctx, tableName); + currentThread.setContextClassLoader(env.getClassLoader()); + operation.call(((MasterObserver)env.getInstance()), ctx); } catch (Throwable e) { handleCoprocessorThrowable(env, e); + } finally { + currentThread.setContextClassLoader(cl); } + bypass |= ctx.shouldBypass(); if (ctx.shouldComplete()) { break; } } + operation.postEnvCall(env); } + return bypass; } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 6329d47..6b34dcf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ObjectReference; import org.apache.hadoop.hbase.util.Pair; import com.google.common.collect.ImmutableList; @@ -309,83 +310,46 @@ public class RegionCoprocessorHost * @throws IOException Signals that an I/O exception has occurred. */ public void preOpen() throws IOException { - - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).preOpen(ctx); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preOpen(ctx); } - } - + }); } /** * Invoked after a region open */ public void postOpen() { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).postOpen(ctx); - } catch (Throwable e) { - handleCoprocessorThrowableNoRethrow(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; + try { + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postOpen(ctx); } - } + }); + } catch (IOException e) { + LOG.warn(e); } - } /** * Invoked after log replay on region */ public void postLogReplay() { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).postLogReplay(ctx); - } catch (Throwable e) { - handleCoprocessorThrowableNoRethrow(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; + try { + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postLogReplay(ctx); } - } + }); + } catch (IOException e) { + LOG.warn(e); } } @@ -394,25 +358,13 @@ public class RegionCoprocessorHost * @param abortRequested true if the server is aborting */ public void preClose(final boolean abortRequested) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).preClose(ctx, abortRequested); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); + execOperation(false, new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preClose(ctx, abortRequested); } - } - + }); } /** @@ -420,26 +372,20 @@ public class RegionCoprocessorHost * @param abortRequested true if the server is aborting */ public void postClose(final boolean abortRequested) { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).postClose(ctx, abortRequested); - } catch (Throwable e) { - handleCoprocessorThrowableNoRethrow(env, e); - } finally { - currentThread.setContextClassLoader(cl); + try { + execOperation(false, new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postClose(ctx, abortRequested); } - env.offerExecutionLatency(System.nanoTime() - startTime); - } - shutdown(env); + public void postEnvCall(RegionEnvironment env) { + shutdown(env); + } + }); + } catch (IOException e) { + LOG.warn(e); } - } /** @@ -449,30 +395,16 @@ public class RegionCoprocessorHost public InternalScanner preCompactScannerOpen(final Store store, final List scanners, final ScanType scanType, final long earliestPutTs, final CompactionRequest request) throws IOException { - ObserverContext ctx = null; - InternalScanner s = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - s = ((RegionObserver) env.getInstance()).preCompactScannerOpen(ctx, store, - scanners, scanType, earliestPutTs, s, request); - } catch (Throwable e) { - handleCoprocessorThrowable(env,e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + final ObjectReference res = new ObjectReference(null); + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + res.set(oserver.preCompactScannerOpen(ctx, store, scanners, scanType, + earliestPutTs, res.get(), request)); } - } - return s; + }); + return res.get(); } /** @@ -486,31 +418,13 @@ public class RegionCoprocessorHost */ public boolean preCompactSelection(final Store store, final List candidates, final CompactionRequest request) throws IOException { - ObserverContext ctx = null; - boolean bypass = false; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).preCompactSelection(ctx, store, candidates, - request); - } catch (Throwable e) { - handleCoprocessorThrowable(env,e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preCompactSelection(ctx, store, candidates, request); } - } - return bypass; + }); } /** @@ -522,29 +436,17 @@ public class RegionCoprocessorHost */ public void postCompactSelection(final Store store, final ImmutableList selected, final CompactionRequest request) { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).postCompactSelection(ctx, store, selected, - request); - } catch (Throwable e) { - handleCoprocessorThrowableNoRethrow(env,e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; + try { + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postCompactSelection(ctx, store, selected, request); } - } + }); + } catch (IOException e) { + LOG.warn(e); } - } /** @@ -557,32 +459,15 @@ public class RegionCoprocessorHost */ public InternalScanner preCompact(final Store store, final InternalScanner scanner, final ScanType scanType, final CompactionRequest request) throws IOException { - ObserverContext ctx = null; - boolean bypass = false; - InternalScanner s = scanner; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - s = ((RegionObserver) env.getInstance()).preCompact(ctx, store, s, scanType, - request); - } catch (Throwable e) { - handleCoprocessorThrowable(env,e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + final ObjectReference res = new ObjectReference(scanner); + boolean bypass = execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + res.set(oserver.preCompact(ctx, store, res.get(), scanType, request)); } - } - return bypass ? null : s; + }); + return bypass ? null : res.get(); } /** @@ -594,58 +479,30 @@ public class RegionCoprocessorHost */ public void postCompact(final Store store, final StoreFile resultFile, final CompactionRequest request) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).postCompact(ctx, store, resultFile, request); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postCompact(ctx, store, resultFile, request); } - } + }); } /** * Invoked before a memstore flush * @throws IOException */ - public InternalScanner preFlush(final Store store, final InternalScanner scanner) throws IOException { - ObserverContext ctx = null; - boolean bypass = false; - InternalScanner s = scanner; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - s = ((RegionObserver)env.getInstance()).preFlush(ctx, store, s); - } catch (Throwable e) { - handleCoprocessorThrowable(env,e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + public InternalScanner preFlush(final Store store, final InternalScanner scanner) + throws IOException { + final ObjectReference res = new ObjectReference(scanner); + boolean bypass = execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + res.set(oserver.preFlush(ctx, store, res.get())); } - } - return bypass ? null : s; + }); + return bypass ? null : res.get(); } /** @@ -653,27 +510,13 @@ public class RegionCoprocessorHost * @throws IOException */ public void preFlush() throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).preFlush(ctx); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preFlush(ctx); } - } + }); } /** @@ -683,30 +526,15 @@ public class RegionCoprocessorHost */ public InternalScanner preFlushScannerOpen(final Store store, final KeyValueScanner memstoreScanner) throws IOException { - ObserverContext ctx = null; - InternalScanner s = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - s = ((RegionObserver) env.getInstance()).preFlushScannerOpen(ctx, store, - memstoreScanner, s); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + final ObjectReference res = new ObjectReference(); + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + res.set(oserver.preFlushScannerOpen(ctx, store, memstoreScanner, res.get())); } - } - return s; + }); + return res.get(); } /** @@ -714,27 +542,13 @@ public class RegionCoprocessorHost * @throws IOException */ public void postFlush() throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).postFlush(ctx); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postFlush(ctx); } - } + }); } /** @@ -742,27 +556,13 @@ public class RegionCoprocessorHost * @throws IOException */ public void postFlush(final Store store, final StoreFile storeFile) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).postFlush(ctx, store, storeFile); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postFlush(ctx, store, storeFile); } - } + }); } /** @@ -770,28 +570,13 @@ public class RegionCoprocessorHost * @throws IOException */ public void preSplit() throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).preSplit(ctx); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preSplit(ctx); } - } - + }); } /** @@ -799,28 +584,13 @@ public class RegionCoprocessorHost * @throws IOException */ public void preSplit(final byte[] splitRow) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).preSplit(ctx, splitRow); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preSplit(ctx, splitRow); } - } - + }); } /** @@ -830,79 +600,34 @@ public class RegionCoprocessorHost * @throws IOException */ public void postSplit(final HRegion l, final HRegion r) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).postSplit(ctx, l, r); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postSplit(ctx, l, r); } - } + }); } - public boolean preSplitBeforePONR(final byte[] splitKey, + public boolean preSplitBeforePONR(final byte[] splitKey, final List metaEntries) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).preSplitBeforePONR(ctx, splitKey, metaEntries); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preSplitBeforePONR(ctx, splitKey, metaEntries); } - } - return bypass; + }); } public void preSplitAfterPONR() throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).preSplitAfterPONR(ctx); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preSplitAfterPONR(ctx); } - } + }); } /** @@ -910,27 +635,13 @@ public class RegionCoprocessorHost * @throws IOException */ public void preRollBackSplit() throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).preRollBackSplit(ctx); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preRollBackSplit(ctx); } - } + }); } /** @@ -938,27 +649,13 @@ public class RegionCoprocessorHost * @throws IOException */ public void postRollBackSplit() throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).postRollBackSplit(ctx); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postRollBackSplit(ctx); } - } + }); } /** @@ -966,27 +663,13 @@ public class RegionCoprocessorHost * @throws IOException */ public void postCompleteSplit() throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).postCompleteSplit(ctx); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postCompleteSplit(ctx); } - } + }); } // RegionObserver support @@ -1000,30 +683,13 @@ public class RegionCoprocessorHost */ public boolean preGetClosestRowBefore(final byte[] row, final byte[] family, final Result result) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).preGetClosestRowBefore(ctx, row, family, result); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preGetClosestRowBefore(ctx, row, family, result); } - } - return bypass; + }); } /** @@ -1034,27 +700,13 @@ public class RegionCoprocessorHost */ public void postGetClosestRowBefore(final byte[] row, final byte[] family, final Result result) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).postGetClosestRowBefore(ctx, row, family, result); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postGetClosestRowBefore(ctx, row, family, result); } - } + }); } /** @@ -1064,30 +716,13 @@ public class RegionCoprocessorHost */ public boolean preGet(final Get get, final List results) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).preGetOp(ctx, get, results); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preGetOp(ctx, get, results); } - } - return bypass; + }); } /** @@ -1096,28 +731,14 @@ public class RegionCoprocessorHost * @exception IOException Exception */ public void postGet(final Get get, final List results) - throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).postGetOp(ctx, get, results); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + throws IOException { + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postGetOp(ctx, get, results); } - } + }); } /** @@ -1127,31 +748,15 @@ public class RegionCoprocessorHost * @exception IOException Exception */ public Boolean preExists(final Get get) throws IOException { - boolean bypass = false; - boolean exists = false; - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - exists = ((RegionObserver)env.getInstance()).preExists(ctx, get, exists); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + final ObjectReference exists = new ObjectReference(false); + boolean bypass = execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + exists.set(oserver.preExists(ctx, get, exists.get())); } - } - return bypass ? exists : null; + }); + return bypass ? exists.get() : null; } /** @@ -1162,28 +767,15 @@ public class RegionCoprocessorHost */ public boolean postExists(final Get get, boolean exists) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - exists = ((RegionObserver)env.getInstance()).postExists(ctx, get, exists); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + final ObjectReference res = new ObjectReference(exists); + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + res.set(oserver.postExists(ctx, get, res.get())); } - } - return exists; + }); + return res.get(); } /** @@ -1195,30 +787,13 @@ public class RegionCoprocessorHost */ public boolean prePut(final Put put, final WALEdit edit, final Durability durability) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).prePut(ctx, put, edit, durability); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.prePut(ctx, put, edit, durability); } - } - return bypass; + }); } /** @@ -1231,34 +806,15 @@ public class RegionCoprocessorHost * @exception IOException * Exception */ - public boolean prePrepareTimeStampForDeleteVersion(Mutation mutation, - Cell kv, byte[] byteNow, Get get) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()) - .prePrepareTimeStampForDeleteVersion(ctx, mutation, kv, - byteNow, get); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation, + final Cell kv, final byte[] byteNow, final Get get) throws IOException { + return execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.prePrepareTimeStampForDeleteVersion(ctx, mutation, kv, byteNow, get); } - } - return bypass; + }); } /** @@ -1269,27 +825,13 @@ public class RegionCoprocessorHost */ public void postPut(final Put put, final WALEdit edit, final Durability durability) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).postPut(ctx, put, edit, durability); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postPut(ctx, put, edit, durability); } - } + }); } /** @@ -1301,30 +843,13 @@ public class RegionCoprocessorHost */ public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).preDelete(ctx, delete, edit, durability); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preDelete(ctx, delete, edit, durability); } - } - return bypass; + }); } /** @@ -1335,29 +860,15 @@ public class RegionCoprocessorHost */ public void postDelete(final Delete delete, final WALEdit edit, final Durability durability) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).postDelete(ctx, delete, edit, durability); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postDelete(ctx, delete, edit, durability); } - } + }); } - + /** * @param miniBatchOp * @return true if default processing should be bypassed @@ -1365,31 +876,13 @@ public class RegionCoprocessorHost */ public boolean preBatchMutate( final MiniBatchOperationInProgress miniBatchOp) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).preBatchMutate(ctx, miniBatchOp); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preBatchMutate(ctx, miniBatchOp); } - } - - return bypass; + }); } /** @@ -1398,54 +891,25 @@ public class RegionCoprocessorHost */ public void postBatchMutate( final MiniBatchOperationInProgress miniBatchOp) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).postBatchMutate(ctx, miniBatchOp); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postBatchMutate(ctx, miniBatchOp); } - } + }); } public void postBatchMutateIndispensably( final MiniBatchOperationInProgress miniBatchOp, final boolean success) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).postBatchMutateIndispensably(ctx, miniBatchOp, - success); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postBatchMutateIndispensably(ctx, miniBatchOp, success); } - } + }); } /** @@ -1462,33 +926,17 @@ public class RegionCoprocessorHost public Boolean preCheckAndPut(final byte [] row, final byte [] family, final byte [] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator, final Put put) - throws IOException { - boolean bypass = false; - boolean result = false; - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - result = ((RegionObserver)env.getInstance()).preCheckAndPut(ctx, row, family, qualifier, - compareOp, comparator, put, result); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + throws IOException { + final ObjectReference res = new ObjectReference(false); + boolean bypass = execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + res.set(oserver.preCheckAndPut(ctx, row, family, qualifier, + compareOp, comparator, put, res.get())); } - } - return bypass ? result : null; + }); + return bypass ? res.get() : null; } /** @@ -1505,32 +953,16 @@ public class RegionCoprocessorHost public Boolean preCheckAndPutAfterRowLock(final byte[] row, final byte[] family, final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator, final Put put) throws IOException { - boolean bypass = false; - boolean result = false; - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - result = ((RegionObserver) env.getInstance()).preCheckAndPutAfterRowLock(ctx, row, - family, qualifier, compareOp, comparator, put, result); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + final ObjectReference res = new ObjectReference(false); + boolean bypass = execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + res.set(oserver.preCheckAndPutAfterRowLock(ctx, row, family, qualifier, + compareOp, comparator, put, res.get())); } - } - return bypass ? result : null; + }); + return bypass ? res.get() : null; } /** @@ -1545,31 +977,17 @@ public class RegionCoprocessorHost public boolean postCheckAndPut(final byte [] row, final byte [] family, final byte [] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator, final Put put, - boolean result) - throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - result = ((RegionObserver)env.getInstance()).postCheckAndPut(ctx, row, family, - qualifier, compareOp, comparator, put, result); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + boolean result) throws IOException { + final ObjectReference res = new ObjectReference(result); + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + res.set(oserver.postCheckAndPut(ctx, row, family, qualifier, + compareOp, comparator, put, res.get())); } - } - return result; + }); + return res.get(); } /** @@ -1587,32 +1005,16 @@ public class RegionCoprocessorHost final byte [] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator, final Delete delete) throws IOException { - boolean bypass = false; - boolean result = false; - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - result = ((RegionObserver)env.getInstance()).preCheckAndDelete(ctx, row, family, - qualifier, compareOp, comparator, delete, result); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + final ObjectReference res = new ObjectReference(false); + boolean bypass = execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + res.set(oserver.preCheckAndDelete(ctx, row, family, + qualifier, compareOp, comparator, delete, res.get())); } - } - return bypass ? result : null; + }); + return bypass ? res.get() : null; } /** @@ -1629,32 +1031,16 @@ public class RegionCoprocessorHost public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family, final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator, final Delete delete) throws IOException { - boolean bypass = false; - boolean result = false; - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - result = ((RegionObserver) env.getInstance()).preCheckAndDeleteAfterRowLock(ctx, row, - family, qualifier, compareOp, comparator, delete, result); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + final ObjectReference res = new ObjectReference(false); + boolean bypass = execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + res.set(oserver.preCheckAndDeleteAfterRowLock(ctx, row, + family, qualifier, compareOp, comparator, delete, res.get())); } - } - return bypass ? result : null; + }); + return bypass ? res.get() : null; } /** @@ -1670,29 +1056,16 @@ public class RegionCoprocessorHost final byte [] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator, final Delete delete, boolean result) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - result = ((RegionObserver)env.getInstance()).postCheckAndDelete(ctx, row, family, - qualifier, compareOp, comparator, delete, result); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + final ObjectReference res = new ObjectReference(result); + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + res.set(oserver.postCheckAndDelete(ctx, row, family, + qualifier, compareOp, comparator, delete, res.get())); } - } - return result; + }); + return res.get(); } /** @@ -1702,31 +1075,15 @@ public class RegionCoprocessorHost * @throws IOException if an error occurred on the coprocessor */ public Result preAppend(final Append append) throws IOException { - boolean bypass = false; - Result result = null; - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - result = ((RegionObserver)env.getInstance()).preAppend(ctx, append); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + final ObjectReference res = new ObjectReference(null); + boolean bypass = execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + res.set(oserver.preAppend(ctx, append)); } - } - return bypass ? result : null; + }); + return bypass ? res.get() : null; } /** @@ -1736,31 +1093,15 @@ public class RegionCoprocessorHost * @throws IOException if an error occurred on the coprocessor */ public Result preAppendAfterRowLock(final Append append) throws IOException { - boolean bypass = false; - Result result = null; - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - result = ((RegionObserver) env.getInstance()).preAppendAfterRowLock(ctx, append); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + final ObjectReference res = new ObjectReference(null); + boolean bypass = execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + res.set(oserver.preAppendAfterRowLock(ctx, append)); } - } - return bypass ? result : null; + }); + return bypass ? res.get() : null; } /** @@ -1770,31 +1111,15 @@ public class RegionCoprocessorHost * @throws IOException if an error occurred on the coprocessor */ public Result preIncrement(final Increment increment) throws IOException { - boolean bypass = false; - Result result = null; - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - result = ((RegionObserver)env.getInstance()).preIncrement(ctx, increment); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + final ObjectReference res = new ObjectReference(null); + boolean bypass = execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + res.set(oserver.preIncrement(ctx, increment)); } - } - return bypass ? result : null; + }); + return bypass ? res.get() : null; } /** @@ -1804,31 +1129,15 @@ public class RegionCoprocessorHost * @throws IOException if an error occurred on the coprocessor */ public Result preIncrementAfterRowLock(final Increment increment) throws IOException { - boolean bypass = false; - Result result = null; - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - result = ((RegionObserver) env.getInstance()).preIncrementAfterRowLock(ctx, increment); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + final ObjectReference res = new ObjectReference(null); + boolean bypass = execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + res.set(oserver.preIncrementAfterRowLock(ctx, increment)); } - } - return bypass ? result : null; + }); + return bypass ? res.get() : null; } /** @@ -1837,27 +1146,13 @@ public class RegionCoprocessorHost * @throws IOException if an error occurred on the coprocessor */ public void postAppend(final Append append, final Result result) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).postAppend(ctx, append, result); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postAppend(ctx, append, result); } - } + }); } /** @@ -1866,28 +1161,15 @@ public class RegionCoprocessorHost * @throws IOException if an error occurred on the coprocessor */ public Result postIncrement(final Increment increment, Result result) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - result = ((RegionObserver)env.getInstance()).postIncrement(ctx, increment, result); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + final ObjectReference res = new ObjectReference(result); + boolean bypass = execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + res.set(oserver.postIncrement(ctx, increment, res.get())); } - } - return result; + }); + return res.get(); } /** @@ -1897,31 +1179,15 @@ public class RegionCoprocessorHost * @exception IOException Exception */ public RegionScanner preScannerOpen(final Scan scan) throws IOException { - boolean bypass = false; - RegionScanner s = null; - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - s = ((RegionObserver)env.getInstance()).preScannerOpen(ctx, scan, s); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + final ObjectReference res = new ObjectReference(null); + boolean bypass = execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + res.set(oserver.preScannerOpen(ctx, scan, res.get())); } - } - return bypass ? s : null; + }); + return bypass ? res.get() : null; } /** @@ -1931,30 +1197,15 @@ public class RegionCoprocessorHost */ public KeyValueScanner preStoreScannerOpen(final Store store, final Scan scan, final NavigableSet targetCols) throws IOException { - KeyValueScanner s = null; - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - s = ((RegionObserver) env.getInstance()).preStoreScannerOpen(ctx, store, scan, - targetCols, s); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + final ObjectReference res = new ObjectReference(null); + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + res.set(oserver.preStoreScannerOpen(ctx, store, scan, targetCols, res.get())); } - } - return s; + }); + return res.get(); } /** @@ -1964,28 +1215,15 @@ public class RegionCoprocessorHost * @exception IOException Exception */ public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - s = ((RegionObserver)env.getInstance()).postScannerOpen(ctx, scan, s); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + final ObjectReference res = new ObjectReference(s); + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + res.set(oserver.postScannerOpen(ctx, scan, res.get())); } - } - return s; + }); + return res.get(); } /** @@ -1998,32 +1236,15 @@ public class RegionCoprocessorHost */ public Boolean preScannerNext(final InternalScanner s, final List results, final int limit) throws IOException { - boolean bypass = false; - boolean hasNext = false; - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - hasNext = ((RegionObserver)env.getInstance()).preScannerNext(ctx, s, results, limit, - hasNext); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - if (ctx.shouldComplete()) { - break; - } + final ObjectReference hasNext = new ObjectReference(false); + boolean bypass = execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + hasNext.set(oserver.preScannerNext(ctx, s, results, limit, hasNext.get())); } - } - - return bypass ? hasNext : null; + }); + return bypass ? hasNext.get() : null; } /** @@ -2037,29 +1258,15 @@ public class RegionCoprocessorHost public boolean postScannerNext(final InternalScanner s, final List results, final int limit, boolean hasMore) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - hasMore = ((RegionObserver)env.getInstance()).postScannerNext(ctx, s, results, limit, - hasMore); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + final ObjectReference res = new ObjectReference(hasMore); + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + res.set(oserver.postScannerNext(ctx, s, results, limit, res.get())); } - } - return hasMore; + }); + return res.get(); } /** @@ -2074,125 +1281,61 @@ public class RegionCoprocessorHost */ public boolean postScannerFilterRow(final InternalScanner s, final byte[] currentRow, final int offset, final short length) throws IOException { - boolean hasMore = true; // By default assume more rows there. - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - hasMore = ((RegionObserver) env.getInstance()).postScannerFilterRow(ctx, s, currentRow, - offset, length, hasMore); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + final ObjectReference hasMore = new ObjectReference(true); + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + hasMore.set(oserver.postScannerFilterRow(ctx, s, currentRow, offset,length, hasMore.get())); } - } - return hasMore; + }); + return hasMore.get(); } - + /** * @param s the scanner * @return true if default behavior should be bypassed, false otherwise * @exception IOException Exception */ public boolean preScannerClose(final InternalScanner s) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).preScannerClose(ctx, s); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } - } - } - - return bypass; - } - - /** - * @exception IOException Exception - */ - public void postScannerClose(final InternalScanner s) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).postScannerClose(ctx, s); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preScannerClose(ctx, s); } - } - } - - /** - * @param info - * @param logKey - * @param logEdit - * @return true if default behavior should be bypassed, false otherwise - * @throws IOException - */ - public boolean preWALRestore(final HRegionInfo info, final HLogKey logKey, - final WALEdit logEdit) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).preWALRestore(ctx, info, logKey, logEdit); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + }); + } + + /** + * @exception IOException Exception + */ + public void postScannerClose(final InternalScanner s) throws IOException { + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postScannerClose(ctx, s); } - } - return bypass; + }); + } + + /** + * @param info + * @param logKey + * @param logEdit + * @return true if default behavior should be bypassed, false otherwise + * @throws IOException + */ + public boolean preWALRestore(final HRegionInfo info, final HLogKey logKey, + final WALEdit logEdit) throws IOException { + return execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preWALRestore(ctx, info, logKey, logEdit); + } + }); } /** @@ -2203,27 +1346,13 @@ public class RegionCoprocessorHost */ public void postWALRestore(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).postWALRestore(ctx, info, logKey, logEdit); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postWALRestore(ctx, info, logKey, logEdit); } - } + }); } /** @@ -2232,30 +1361,13 @@ public class RegionCoprocessorHost * @throws IOException */ public boolean preBulkLoadHFile(final List> familyPaths) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).preBulkLoadHFile(ctx, familyPaths); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preBulkLoadHFile(ctx, familyPaths); } - } - return bypass; + }); } /** @@ -2266,77 +1378,35 @@ public class RegionCoprocessorHost */ public boolean postBulkLoadHFile(final List> familyPaths, boolean hasLoaded) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - hasLoaded = ((RegionObserver)env.getInstance()).postBulkLoadHFile(ctx, familyPaths, - hasLoaded); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + final ObjectReference res = new ObjectReference(hasLoaded); + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + res.set(oserver.postBulkLoadHFile(ctx, familyPaths, res.get())); } - } - return hasLoaded; + }); + return res.get(); } public void postStartRegionOperation(final Operation op) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).postStartRegionOperation(ctx, op); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postStartRegionOperation(ctx, op); } - } + }); } public void postCloseRegionOperation(final Operation op) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).postCloseRegionOperation(ctx, op); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postCloseRegionOperation(ctx, op); } - } + }); } /** @@ -2353,30 +1423,16 @@ public class RegionCoprocessorHost public StoreFile.Reader preStoreFileReaderOpen(final FileSystem fs, final Path p, final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf, final Reference r) throws IOException { - StoreFile.Reader reader = null; - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - reader = ((RegionObserver) env.getInstance()).preStoreFileReaderOpen(ctx, fs, p, in, - size, cacheConf, r, reader); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + final ObjectReference reader = new ObjectReference(); + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + reader.set(oserver.preStoreFileReaderOpen(ctx, fs, p, in, size, + cacheConf, r, reader.get())); } - } - return reader; + }); + return reader.get(); } /** @@ -2392,154 +1448,155 @@ public class RegionCoprocessorHost */ public StoreFile.Reader postStoreFileReaderOpen(final FileSystem fs, final Path p, final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf, - final Reference r, StoreFile.Reader reader) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - reader = ((RegionObserver) env.getInstance()).postStoreFileReaderOpen(ctx, fs, p, in, - size, cacheConf, r, reader); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + final Reference r, final StoreFile.Reader reader) throws IOException { + final ObjectReference res = new ObjectReference(reader); + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + res.set(oserver.postStoreFileReaderOpen(ctx, fs, p, in, size, + cacheConf, r, res.get())); } - } - return reader; + }); + return res.get(); } public Cell postMutationBeforeWAL(final MutationType opType, final Mutation mutation, final Cell oldCell, Cell newCell) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - newCell = ((RegionObserver) env.getInstance()).postMutationBeforeWAL(ctx, opType, - mutation, oldCell, newCell); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + final ObjectReference newCellRef = new ObjectReference(newCell); + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + newCellRef.set(oserver.postMutationBeforeWAL(ctx, opType, mutation, + oldCell, newCellRef.get())); } - } - return newCell; + }); + return newCellRef.get(); } public Message preEndpointInvocation(final Service service, final String methodName, Message request) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof EndpointObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - request = ((EndpointObserver) env.getInstance()).preEndpointInvocation(ctx, service, - methodName, request); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + final ObjectReference res = new ObjectReference(request); + execOperation(new EndpointOperation() { + @Override + public void call(EndpointObserver oserver, ObserverContext ctx) + throws IOException { + res.set(oserver.preEndpointInvocation(ctx, service, methodName, res.get())); } - } - return request; + }); + return res.get(); } public void postEndpointInvocation(final Service service, final String methodName, final Message request, final Message.Builder responseBuilder) throws IOException { - ObserverContext ctx = null; + execOperation(new EndpointOperation() { + @Override + public void call(EndpointObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postEndpointInvocation(ctx, service, methodName, request, responseBuilder); + } + }); + } + + public DeleteTracker postInstantiateDeleteTracker(DeleteTracker tracker) throws IOException { + final ObjectReference res = new ObjectReference(tracker); + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + res.set(oserver.postInstantiateDeleteTracker(ctx, res.get())); + } + }); + return res.get(); + } + + public Map getCoprocessorExecutionStatistics() { + Map results = new HashMap(); for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof EndpointObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((EndpointObserver) env.getInstance()).postEndpointInvocation(ctx, service, - methodName, request, responseBuilder); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); + DescriptiveStatistics ds = new DescriptiveStatistics(); + if (env.getInstance() instanceof RegionObserver) { + for (Long time : env.getExecutionLatenciesNanos()) { + ds.addValue(time); } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; + // Ensures that web ui circumvents the display of NaN values when there are zero samples. + if (ds.getN() == 0) { + ds.addValue(0); } + results.put(env.getInstance().getClass().getSimpleName(), ds); } } + return results; + } + + private abstract class CoprocessorOperation { + public abstract void call(Coprocessor observer, + ObserverContext ctx) throws IOException; + public abstract boolean hasCall(Coprocessor observer); + public void postEnvCall(RegionEnvironment env) { } + } + + private abstract class RegionOperation extends CoprocessorOperation { + public abstract void call(RegionObserver observer, + ObserverContext ctx) throws IOException; + + public boolean hasCall(Coprocessor observer) { + return observer instanceof RegionObserver; + } + public void call(Coprocessor observer, ObserverContext ctx) + throws IOException { + call((RegionObserver)observer, ctx); + } } - public DeleteTracker postInstantiateDeleteTracker(DeleteTracker tracker) throws IOException { + private abstract class EndpointOperation extends CoprocessorOperation { + public abstract void call(EndpointObserver observer, + ObserverContext ctx) throws IOException; + + public boolean hasCall(Coprocessor observer) { + return observer instanceof EndpointObserver; + } + + public void call(Coprocessor observer, ObserverContext ctx) + throws IOException { + call((EndpointObserver)observer, ctx); + } + } + + private boolean execOperation(final CoprocessorOperation operation) + throws IOException { + return execOperation(true, operation); + } + + private boolean execOperation(final boolean earlyExit, final CoprocessorOperation operation) + throws IOException { + boolean bypass = false; ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { + for (RegionEnvironment env: coprocessors) { + Coprocessor observer = env.getInstance(); + if (operation.hasCall(observer)) { long startTime = System.nanoTime(); ctx = ObserverContext.createAndPrepare(env, ctx); Thread currentThread = Thread.currentThread(); ClassLoader cl = currentThread.getContextClassLoader(); try { currentThread.setContextClassLoader(env.getClassLoader()); - tracker = ((RegionObserver) env.getInstance()).postInstantiateDeleteTracker(ctx, - tracker); + operation.call(observer, ctx); } catch (Throwable e) { handleCoprocessorThrowable(env, e); } finally { currentThread.setContextClassLoader(cl); } env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { + bypass |= ctx.shouldBypass(); + if (earlyExit && ctx.shouldComplete()) { break; } } - } - return tracker; - } - public Map getCoprocessorExecutionStatistics() { - Map results = new HashMap(); - for (RegionEnvironment env : coprocessors) { - DescriptiveStatistics ds = new DescriptiveStatistics(); - if (env.getInstance() instanceof RegionObserver) { - for (Long time : env.getExecutionLatenciesNanos()) { - ds.addValue(time); - } - // Ensures that web ui circumvents the display of NaN values when there are zero samples. - if (ds.getN() == 0) { - ds.addValue(0); - } - results.put(env.getInstance().getClass().getSimpleName(), ds); - } + operation.postEnvCall(env); } - return results; + return bypass; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ObjectReference.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ObjectReference.java new file mode 100644 index 0000000..5f53005 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ObjectReference.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +public class ObjectReference { + private T object = null; + + public ObjectReference() { + this(null); + } + + public ObjectReference(T object) { + this.object = object; + } + + public T get() { + return object; + } + + public void set(T object) { + this.object = object; + } +} \ No newline at end of file