diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java index 85694db..0bde71d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java @@ -80,1598 +80,795 @@ public class MasterCoprocessorHost } public boolean preCreateNamespace(final NamespaceDescriptor ns) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preCreateNamespace(ctx, ns); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preCreateNamespace(ctx, ns); } - } - return bypass; + }); } public void postCreateNamespace(final NamespaceDescriptor ns) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postCreateNamespace(ctx, ns); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postCreateNamespace(ctx, ns); } - } + }); } public boolean preDeleteNamespace(final String namespaceName) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preDeleteNamespace(ctx, namespaceName); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preDeleteNamespace(ctx, namespaceName); } - } - return bypass; + }); } public void postDeleteNamespace(final String namespaceName) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postDeleteNamespace(ctx, namespaceName); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postDeleteNamespace(ctx, namespaceName); } - } + }); } public boolean preModifyNamespace(final NamespaceDescriptor ns) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preModifyNamespace(ctx, ns); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preModifyNamespace(ctx, ns); } - } - return bypass; + }); } public void postModifyNamespace(final NamespaceDescriptor ns) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postModifyNamespace(ctx, ns); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postModifyNamespace(ctx, ns); } - } + }); } /* Implementation of hooks for invoking MasterObservers */ public void preCreateTable(final HTableDescriptor htd, final HRegionInfo[] regions) - throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preCreateTable(ctx, htd, regions); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + throws IOException { + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preCreateTable(ctx, htd, regions); } - } + }); } public void postCreateTable(final HTableDescriptor htd, final HRegionInfo[] regions) - throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postCreateTable(ctx, htd, regions); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + throws IOException { + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postCreateTable(ctx, htd, regions); } - } + }); } public void preCreateTableHandler(final HTableDescriptor htd, final HRegionInfo[] regions) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).preCreateTableHandler(ctx, htd, regions); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preCreateTableHandler(ctx, htd, regions); } - } + }); } public void postCreateTableHandler(final HTableDescriptor htd, final HRegionInfo[] regions) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).postCreateTableHandler(ctx, htd, regions); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postCreateTableHandler(ctx, htd, regions); } - } + }); } public void preDeleteTable(final TableName tableName) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preDeleteTable(ctx, tableName); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preDeleteTable(ctx, tableName); } - } + }); } public void postDeleteTable(final TableName tableName) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postDeleteTable(ctx, tableName); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postDeleteTable(ctx, tableName); } - } + }); } public void preDeleteTableHandler(final TableName tableName) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).preDeleteTableHandler(ctx, tableName); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preDeleteTableHandler(ctx, tableName); } - } + }); } public void postDeleteTableHandler(final TableName tableName) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).postDeleteTableHandler(ctx, tableName); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postDeleteTableHandler(ctx, tableName); } - } + }); } - public void preTruncateTable(TableName tableName) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - try { - ((MasterObserver)env.getInstance()).preTruncateTable(ctx, tableName); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } - if (ctx.shouldComplete()) { - break; - } + public void preTruncateTable(final TableName tableName) throws IOException { + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preTruncateTable(ctx, tableName); } - } + }); } - public void postTruncateTable(TableName tableName) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - try { - ((MasterObserver)env.getInstance()).postTruncateTable(ctx, tableName); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } - if (ctx.shouldComplete()) { - break; - } + public void postTruncateTable(final TableName tableName) throws IOException { + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postTruncateTable(ctx, tableName); } - } + }); } - public void preTruncateTableHandler(TableName tableName) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - try { - ((MasterObserver) env.getInstance()).preTruncateTableHandler(ctx, tableName); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } - if (ctx.shouldComplete()) { - break; - } + public void preTruncateTableHandler(final TableName tableName) throws IOException { + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preTruncateTableHandler(ctx, tableName); } - } + }); } - public void postTruncateTableHandler(TableName tableName) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - try { - ((MasterObserver) env.getInstance()).postTruncateTableHandler(ctx, tableName); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } - if (ctx.shouldComplete()) { - break; - } + public void postTruncateTableHandler(final TableName tableName) throws IOException { + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postTruncateTableHandler(ctx, tableName); } - } + }); } public void preModifyTable(final TableName tableName, final HTableDescriptor htd) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preModifyTable(ctx, tableName, htd); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preModifyTable(ctx, tableName, htd); } - } + }); } public void postModifyTable(final TableName tableName, final HTableDescriptor htd) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postModifyTable(ctx, tableName, htd); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postModifyTable(ctx, tableName, htd); } - } + }); } public void preModifyTableHandler(final TableName tableName, final HTableDescriptor htd) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).preModifyTableHandler(ctx, tableName, htd); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preModifyTableHandler(ctx, tableName, htd); } - } + }); } public void postModifyTableHandler(final TableName tableName, final HTableDescriptor htd) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).postModifyTableHandler(ctx, tableName, htd); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postModifyTableHandler(ctx, tableName, htd); } - } + }); } public boolean preAddColumn(final TableName tableName, final HColumnDescriptor column) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preAddColumn(ctx, tableName, column); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preAddColumn(ctx, tableName, column); } - } - return bypass; + }); } public void postAddColumn(final TableName tableName, final HColumnDescriptor column) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postAddColumn(ctx, tableName, column); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postAddColumn(ctx, tableName, column); } - } + }); } public boolean preAddColumnHandler(final TableName tableName, final HColumnDescriptor column) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).preAddColumnHandler(ctx, tableName, column); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preAddColumnHandler(ctx, tableName, column); } - } - return bypass; + }); } public void postAddColumnHandler(final TableName tableName, final HColumnDescriptor column) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).postAddColumnHandler(ctx, tableName, column); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postAddColumnHandler(ctx, tableName, column); } - } + }); } public boolean preModifyColumn(final TableName tableName, final HColumnDescriptor descriptor) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preModifyColumn(ctx, tableName, descriptor); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preModifyColumn(ctx, tableName, descriptor); } - } - return bypass; + }); } public void postModifyColumn(final TableName tableName, final HColumnDescriptor descriptor) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postModifyColumn(ctx, tableName, descriptor); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postModifyColumn(ctx, tableName, descriptor); } - } + }); } public boolean preModifyColumnHandler(final TableName tableName, final HColumnDescriptor descriptor) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).preModifyColumnHandler(ctx, tableName, descriptor); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preModifyColumnHandler(ctx, tableName, descriptor); } - } - return bypass; + }); } public void postModifyColumnHandler(final TableName tableName, final HColumnDescriptor descriptor) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).postModifyColumnHandler(ctx, tableName, descriptor); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postModifyColumnHandler(ctx, tableName, descriptor); } - } + }); } public boolean preDeleteColumn(final TableName tableName, final byte [] c) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preDeleteColumn(ctx, tableName, c); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preDeleteColumn(ctx, tableName, c); } - } - return bypass; + }); } public void postDeleteColumn(final TableName tableName, final byte [] c) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postDeleteColumn(ctx, tableName, c); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postDeleteColumn(ctx, tableName, c); } - } + }); } public boolean preDeleteColumnHandler(final TableName tableName, final byte[] c) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).preDeleteColumnHandler(ctx, tableName, c); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preDeleteColumnHandler(ctx, tableName, c); } - } - return bypass; + }); } public void postDeleteColumnHandler(final TableName tableName, final byte[] c) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).postDeleteColumnHandler(ctx, tableName, c); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postDeleteColumnHandler(ctx, tableName, c); } - } + }); } public void preEnableTable(final TableName tableName) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preEnableTable(ctx, tableName); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preEnableTable(ctx, tableName); } - } + }); } public void postEnableTable(final TableName tableName) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postEnableTable(ctx, tableName); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postEnableTable(ctx, tableName); } - } + }); } public void preEnableTableHandler(final TableName tableName) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).preEnableTableHandler(ctx, tableName); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preEnableTableHandler(ctx, tableName); } - } + }); } public void postEnableTableHandler(final TableName tableName) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).postEnableTableHandler(ctx, tableName); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postEnableTableHandler(ctx, tableName); } - } + }); } public void preDisableTable(final TableName tableName) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preDisableTable(ctx, tableName); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preDisableTable(ctx, tableName); } - } + }); } public void postDisableTable(final TableName tableName) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postDisableTable(ctx, tableName); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postDisableTable(ctx, tableName); } - } + }); } public void preDisableTableHandler(final TableName tableName) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).preDisableTableHandler(ctx, tableName); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preDisableTableHandler(ctx, tableName); } - } + }); } public void postDisableTableHandler(final TableName tableName) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).postDisableTableHandler(ctx, tableName); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postDisableTableHandler(ctx, tableName); } - } + }); } public boolean preMove(final HRegionInfo region, final ServerName srcServer, final ServerName destServer) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preMove(ctx, region, srcServer, destServer); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preMove(ctx, region, srcServer, destServer); } - } - return bypass; + }); } public void postMove(final HRegionInfo region, final ServerName srcServer, final ServerName destServer) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postMove(ctx, region, srcServer, destServer); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postMove(ctx, region, srcServer, destServer); } - } + }); } public boolean preAssign(final HRegionInfo regionInfo) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).preAssign(ctx, regionInfo); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preAssign(ctx, regionInfo); } - } - return bypass; + }); } public void postAssign(final HRegionInfo regionInfo) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postAssign(ctx, regionInfo); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postAssign(ctx, regionInfo); } - } + }); } public boolean preUnassign(final HRegionInfo regionInfo, final boolean force) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preUnassign(ctx, regionInfo, force); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preUnassign(ctx, regionInfo, force); } - } - return bypass; + }); } public void postUnassign(final HRegionInfo regionInfo, final boolean force) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postUnassign(ctx, regionInfo, force); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postUnassign(ctx, regionInfo, force); } - } + }); } public void preRegionOffline(final HRegionInfo regionInfo) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).preRegionOffline(ctx, regionInfo); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preRegionOffline(ctx, regionInfo); } - } + }); } public void postRegionOffline(final HRegionInfo regionInfo) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).postRegionOffline(ctx, regionInfo); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postRegionOffline(ctx, regionInfo); } - } + }); } public boolean preBalance() throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preBalance(ctx); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - if (ctx.shouldComplete()) { - break; - } - } - } - return bypass; - } - - public void postBalance(final List plans) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postBalance(ctx, plans); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } - } - } - } - - public boolean preBalanceSwitch(final boolean b) throws IOException { - boolean balance = b; - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - balance = ((MasterObserver)env.getInstance()).preBalanceSwitch(ctx, balance); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + return execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preBalance(ctx); } - } - return balance; + }); } - void postBalanceSwitch(final boolean oldValue, final boolean newValue) - throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postBalanceSwitch(ctx, oldValue, newValue); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + public void postBalance(final List plans) throws IOException { + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postBalance(ctx, plans); } - } + }); + } + + public boolean preBalanceSwitch(final boolean b) throws IOException { + return execOperationWithResult(new CoprocessorOperationWithResult(b) { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + setResult(oserver.preBalanceSwitch(ctx, getResult())); + } + }); + } + + public void postBalanceSwitch(final boolean oldValue, final boolean newValue) + throws IOException { + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postBalanceSwitch(ctx, oldValue, newValue); + } + }); } public void preShutdown() throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preShutdown(ctx); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preShutdown(ctx); } - // invoke coprocessor stop method - shutdown(env); - } + @Override + public void postEnvCall(MasterEnvironment env) { + // invoke coprocessor stop method + shutdown(env); + } + }); } public void preStopMaster() throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preStopMaster(ctx); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preStopMaster(ctx); } - // invoke coprocessor stop method - shutdown(env); - } + @Override + public void postEnvCall(MasterEnvironment env) { + // invoke coprocessor stop method + shutdown(env); + } + }); } public void preMasterInitialization() throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).preMasterInitialization(ctx); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preMasterInitialization(ctx); } - } + }); } public void postStartMaster() throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postStartMaster(ctx); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postStartMaster(ctx); } - } + }); } public void preSnapshot(final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preSnapshot(ctx, snapshot, hTableDescriptor); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preSnapshot(ctx, snapshot, hTableDescriptor); } - } + }); } public void postSnapshot(final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postSnapshot(ctx, snapshot, hTableDescriptor); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postSnapshot(ctx, snapshot, hTableDescriptor); } - } + }); } public void preCloneSnapshot(final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preCloneSnapshot(ctx, snapshot, - hTableDescriptor); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preCloneSnapshot(ctx, snapshot, hTableDescriptor); } - } + }); } public void postCloneSnapshot(final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postCloneSnapshot(ctx, snapshot, - hTableDescriptor); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postCloneSnapshot(ctx, snapshot, hTableDescriptor); } - } + }); } public void preRestoreSnapshot(final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preRestoreSnapshot(ctx, snapshot, - hTableDescriptor); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preRestoreSnapshot(ctx, snapshot, hTableDescriptor); } - } + }); } public void postRestoreSnapshot(final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postRestoreSnapshot(ctx, snapshot, - hTableDescriptor); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postRestoreSnapshot(ctx, snapshot, hTableDescriptor); } - } + }); } public void preDeleteSnapshot(final SnapshotDescription snapshot) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).preDeleteSnapshot(ctx, snapshot); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preDeleteSnapshot(ctx, snapshot); } - } + }); } public void postDeleteSnapshot(final SnapshotDescription snapshot) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postDeleteSnapshot(ctx, snapshot); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postDeleteSnapshot(ctx, snapshot); } - } + }); } public boolean preGetTableDescriptors(final List tableNamesList, final List descriptors) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (MasterEnvironment env : coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver) env.getInstance()).preGetTableDescriptors(ctx, - tableNamesList, descriptors); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preGetTableDescriptors(ctx, tableNamesList, descriptors); } - } - return bypass; + }); } public void postGetTableDescriptors(final List descriptors) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((MasterObserver)env.getInstance()).postGetTableDescriptors(ctx, descriptors); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postGetTableDescriptors(ctx, descriptors); } - } + }); } public void preTableFlush(final TableName tableName) throws IOException { - ObserverContext ctx = null; - for (MasterEnvironment env: coprocessors) { - if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - try { - ((MasterObserver)env.getInstance()).preTableFlush(ctx, tableName); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preTableFlush(ctx, tableName); } - } + }); } public void postTableFlush(final TableName tableName) throws IOException { - ObserverContext ctx = null; + execOperation(new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postTableFlush(ctx, tableName); + } + }); + } + + private static abstract class CoprocessorOperation + extends ObserverContext { + public CoprocessorOperation() { + } + + public abstract void call(MasterObserver oserver, + ObserverContext ctx) throws IOException; + + public void postEnvCall(MasterEnvironment env) { + } + } + + private static abstract class CoprocessorOperationWithResult extends CoprocessorOperation { + private T result = null; + public CoprocessorOperationWithResult() { this(null); } + public CoprocessorOperationWithResult(final T result) { this.result = result; } + public void setResult(final T result) { this.result = result; } + public T getResult() { return this.result; } + } + + private T execOperationWithResult(final CoprocessorOperationWithResult ctx) + throws IOException { + execOperation(ctx); + return ctx.getResult(); + } + + private boolean execOperation(final CoprocessorOperation ctx) throws IOException { + boolean bypass = false; for (MasterEnvironment env: coprocessors) { if (env.getInstance() instanceof MasterObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); + ctx.prepare(env); + Thread currentThread = Thread.currentThread(); + ClassLoader cl = currentThread.getContextClassLoader(); try { - ((MasterObserver)env.getInstance()).postTableFlush(ctx, tableName); + currentThread.setContextClassLoader(env.getClassLoader()); + ctx.call((MasterObserver)env.getInstance(), ctx); } catch (Throwable e) { handleCoprocessorThrowable(env, e); + } finally { + currentThread.setContextClassLoader(cl); } + bypass |= ctx.shouldBypass(); if (ctx.shouldComplete()) { break; } } + ctx.postEnvCall(env); } + return bypass; } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 6329d47..62031b7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -309,83 +309,46 @@ public class RegionCoprocessorHost * @throws IOException Signals that an I/O exception has occurred. */ public void preOpen() throws IOException { - - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).preOpen(ctx); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preOpen(ctx); } - } - + }); } /** * Invoked after a region open */ public void postOpen() { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).postOpen(ctx); - } catch (Throwable e) { - handleCoprocessorThrowableNoRethrow(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; + try { + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postOpen(ctx); } - } + }); + } catch (IOException e) { + LOG.warn(e); } - } /** * Invoked after log replay on region */ public void postLogReplay() { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).postLogReplay(ctx); - } catch (Throwable e) { - handleCoprocessorThrowableNoRethrow(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; + try { + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postLogReplay(ctx); } - } + }); + } catch (IOException e) { + LOG.warn(e); } } @@ -394,25 +357,13 @@ public class RegionCoprocessorHost * @param abortRequested true if the server is aborting */ public void preClose(final boolean abortRequested) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).preClose(ctx, abortRequested); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); + execOperation(false, new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preClose(ctx, abortRequested); } - } - + }); } /** @@ -420,26 +371,20 @@ public class RegionCoprocessorHost * @param abortRequested true if the server is aborting */ public void postClose(final boolean abortRequested) { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).postClose(ctx, abortRequested); - } catch (Throwable e) { - handleCoprocessorThrowableNoRethrow(env, e); - } finally { - currentThread.setContextClassLoader(cl); + try { + execOperation(false, new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postClose(ctx, abortRequested); } - env.offerExecutionLatency(System.nanoTime() - startTime); - } - shutdown(env); + public void postEnvCall(RegionEnvironment env) { + shutdown(env); + } + }); + } catch (IOException e) { + LOG.warn(e); } - } /** @@ -449,30 +394,14 @@ public class RegionCoprocessorHost public InternalScanner preCompactScannerOpen(final Store store, final List scanners, final ScanType scanType, final long earliestPutTs, final CompactionRequest request) throws IOException { - ObserverContext ctx = null; - InternalScanner s = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - s = ((RegionObserver) env.getInstance()).preCompactScannerOpen(ctx, store, - scanners, scanType, earliestPutTs, s, request); - } catch (Throwable e) { - handleCoprocessorThrowable(env,e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(new RegionOperationWithResult() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + setResult(oserver.preCompactScannerOpen(ctx, store, scanners, scanType, + earliestPutTs, getResult(), request)); } - } - return s; + }); } /** @@ -486,31 +415,13 @@ public class RegionCoprocessorHost */ public boolean preCompactSelection(final Store store, final List candidates, final CompactionRequest request) throws IOException { - ObserverContext ctx = null; - boolean bypass = false; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).preCompactSelection(ctx, store, candidates, - request); - } catch (Throwable e) { - handleCoprocessorThrowable(env,e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preCompactSelection(ctx, store, candidates, request); } - } - return bypass; + }); } /** @@ -522,29 +433,17 @@ public class RegionCoprocessorHost */ public void postCompactSelection(final Store store, final ImmutableList selected, final CompactionRequest request) { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).postCompactSelection(ctx, store, selected, - request); - } catch (Throwable e) { - handleCoprocessorThrowableNoRethrow(env,e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; + try { + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postCompactSelection(ctx, store, selected, request); } - } + }); + } catch (IOException e) { + LOG.warn(e); } - } /** @@ -557,32 +456,13 @@ public class RegionCoprocessorHost */ public InternalScanner preCompact(final Store store, final InternalScanner scanner, final ScanType scanType, final CompactionRequest request) throws IOException { - ObserverContext ctx = null; - boolean bypass = false; - InternalScanner s = scanner; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - s = ((RegionObserver) env.getInstance()).preCompact(ctx, store, s, scanType, - request); - } catch (Throwable e) { - handleCoprocessorThrowable(env,e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(false, new RegionOperationWithResult(scanner) { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + setResult(oserver.preCompact(ctx, store, getResult(), scanType, request)); } - } - return bypass ? null : s; + }); } /** @@ -594,58 +474,28 @@ public class RegionCoprocessorHost */ public void postCompact(final Store store, final StoreFile resultFile, final CompactionRequest request) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).postCompact(ctx, store, resultFile, request); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postCompact(ctx, store, resultFile, request); } - } + }); } /** * Invoked before a memstore flush * @throws IOException */ - public InternalScanner preFlush(final Store store, final InternalScanner scanner) throws IOException { - ObserverContext ctx = null; - boolean bypass = false; - InternalScanner s = scanner; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - s = ((RegionObserver)env.getInstance()).preFlush(ctx, store, s); - } catch (Throwable e) { - handleCoprocessorThrowable(env,e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + public InternalScanner preFlush(final Store store, final InternalScanner scanner) + throws IOException { + return execOperationWithResult(false, new RegionOperationWithResult(scanner) { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + setResult(oserver.preFlush(ctx, store, getResult())); } - } - return bypass ? null : s; + }); } /** @@ -653,27 +503,13 @@ public class RegionCoprocessorHost * @throws IOException */ public void preFlush() throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).preFlush(ctx); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preFlush(ctx); } - } + }); } /** @@ -683,30 +519,13 @@ public class RegionCoprocessorHost */ public InternalScanner preFlushScannerOpen(final Store store, final KeyValueScanner memstoreScanner) throws IOException { - ObserverContext ctx = null; - InternalScanner s = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - s = ((RegionObserver) env.getInstance()).preFlushScannerOpen(ctx, store, - memstoreScanner, s); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(new RegionOperationWithResult() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + setResult(oserver.preFlushScannerOpen(ctx, store, memstoreScanner, getResult())); } - } - return s; + }); } /** @@ -714,27 +533,13 @@ public class RegionCoprocessorHost * @throws IOException */ public void postFlush() throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).postFlush(ctx); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postFlush(ctx); } - } + }); } /** @@ -742,27 +547,13 @@ public class RegionCoprocessorHost * @throws IOException */ public void postFlush(final Store store, final StoreFile storeFile) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).postFlush(ctx, store, storeFile); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postFlush(ctx, store, storeFile); } - } + }); } /** @@ -770,28 +561,13 @@ public class RegionCoprocessorHost * @throws IOException */ public void preSplit() throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).preSplit(ctx); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preSplit(ctx); } - } - + }); } /** @@ -799,28 +575,13 @@ public class RegionCoprocessorHost * @throws IOException */ public void preSplit(final byte[] splitRow) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).preSplit(ctx, splitRow); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preSplit(ctx, splitRow); } - } - + }); } /** @@ -830,79 +591,34 @@ public class RegionCoprocessorHost * @throws IOException */ public void postSplit(final HRegion l, final HRegion r) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).postSplit(ctx, l, r); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postSplit(ctx, l, r); } - } + }); } - public boolean preSplitBeforePONR(final byte[] splitKey, + public boolean preSplitBeforePONR(final byte[] splitKey, final List metaEntries) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).preSplitBeforePONR(ctx, splitKey, metaEntries); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preSplitBeforePONR(ctx, splitKey, metaEntries); } - } - return bypass; + }); } public void preSplitAfterPONR() throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).preSplitAfterPONR(ctx); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preSplitAfterPONR(ctx); } - } + }); } /** @@ -910,27 +626,13 @@ public class RegionCoprocessorHost * @throws IOException */ public void preRollBackSplit() throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).preRollBackSplit(ctx); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preRollBackSplit(ctx); } - } + }); } /** @@ -938,27 +640,13 @@ public class RegionCoprocessorHost * @throws IOException */ public void postRollBackSplit() throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).postRollBackSplit(ctx); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postRollBackSplit(ctx); } - } + }); } /** @@ -966,27 +654,13 @@ public class RegionCoprocessorHost * @throws IOException */ public void postCompleteSplit() throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).postCompleteSplit(ctx); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postCompleteSplit(ctx); } - } + }); } // RegionObserver support @@ -1000,30 +674,13 @@ public class RegionCoprocessorHost */ public boolean preGetClosestRowBefore(final byte[] row, final byte[] family, final Result result) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).preGetClosestRowBefore(ctx, row, family, result); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preGetClosestRowBefore(ctx, row, family, result); } - } - return bypass; + }); } /** @@ -1034,27 +691,13 @@ public class RegionCoprocessorHost */ public void postGetClosestRowBefore(final byte[] row, final byte[] family, final Result result) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).postGetClosestRowBefore(ctx, row, family, result); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postGetClosestRowBefore(ctx, row, family, result); } - } + }); } /** @@ -1064,30 +707,13 @@ public class RegionCoprocessorHost */ public boolean preGet(final Get get, final List results) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).preGetOp(ctx, get, results); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preGetOp(ctx, get, results); } - } - return bypass; + }); } /** @@ -1096,28 +722,14 @@ public class RegionCoprocessorHost * @exception IOException Exception */ public void postGet(final Get get, final List results) - throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).postGetOp(ctx, get, results); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + throws IOException { + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postGetOp(ctx, get, results); } - } + }); } /** @@ -1127,31 +739,13 @@ public class RegionCoprocessorHost * @exception IOException Exception */ public Boolean preExists(final Get get) throws IOException { - boolean bypass = false; - boolean exists = false; - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - exists = ((RegionObserver)env.getInstance()).preExists(ctx, get, exists); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(true, new RegionOperationWithResult(false) { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + setResult(oserver.preExists(ctx, get, getResult())); } - } - return bypass ? exists : null; + }); } /** @@ -1162,28 +756,13 @@ public class RegionCoprocessorHost */ public boolean postExists(final Get get, boolean exists) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - exists = ((RegionObserver)env.getInstance()).postExists(ctx, get, exists); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(new RegionOperationWithResult(exists) { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + setResult(oserver.postExists(ctx, get, getResult())); } - } - return exists; + }); } /** @@ -1195,30 +774,13 @@ public class RegionCoprocessorHost */ public boolean prePut(final Put put, final WALEdit edit, final Durability durability) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).prePut(ctx, put, edit, durability); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.prePut(ctx, put, edit, durability); } - } - return bypass; + }); } /** @@ -1231,34 +793,15 @@ public class RegionCoprocessorHost * @exception IOException * Exception */ - public boolean prePrepareTimeStampForDeleteVersion(Mutation mutation, - Cell kv, byte[] byteNow, Get get) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()) - .prePrepareTimeStampForDeleteVersion(ctx, mutation, kv, - byteNow, get); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation, + final Cell kv, final byte[] byteNow, final Get get) throws IOException { + return execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.prePrepareTimeStampForDeleteVersion(ctx, mutation, kv, byteNow, get); } - } - return bypass; + }); } /** @@ -1269,27 +812,13 @@ public class RegionCoprocessorHost */ public void postPut(final Put put, final WALEdit edit, final Durability durability) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).postPut(ctx, put, edit, durability); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postPut(ctx, put, edit, durability); } - } + }); } /** @@ -1301,30 +830,13 @@ public class RegionCoprocessorHost */ public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).preDelete(ctx, delete, edit, durability); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preDelete(ctx, delete, edit, durability); } - } - return bypass; + }); } /** @@ -1335,29 +847,15 @@ public class RegionCoprocessorHost */ public void postDelete(final Delete delete, final WALEdit edit, final Durability durability) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).postDelete(ctx, delete, edit, durability); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postDelete(ctx, delete, edit, durability); } - } + }); } - + /** * @param miniBatchOp * @return true if default processing should be bypassed @@ -1365,31 +863,13 @@ public class RegionCoprocessorHost */ public boolean preBatchMutate( final MiniBatchOperationInProgress miniBatchOp) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).preBatchMutate(ctx, miniBatchOp); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preBatchMutate(ctx, miniBatchOp); } - } - - return bypass; + }); } /** @@ -1398,54 +878,25 @@ public class RegionCoprocessorHost */ public void postBatchMutate( final MiniBatchOperationInProgress miniBatchOp) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).postBatchMutate(ctx, miniBatchOp); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postBatchMutate(ctx, miniBatchOp); } - } + }); } public void postBatchMutateIndispensably( final MiniBatchOperationInProgress miniBatchOp, final boolean success) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).postBatchMutateIndispensably(ctx, miniBatchOp, - success); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postBatchMutateIndispensably(ctx, miniBatchOp, success); } - } + }); } /** @@ -1462,33 +913,15 @@ public class RegionCoprocessorHost public Boolean preCheckAndPut(final byte [] row, final byte [] family, final byte [] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator, final Put put) - throws IOException { - boolean bypass = false; - boolean result = false; - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - result = ((RegionObserver)env.getInstance()).preCheckAndPut(ctx, row, family, qualifier, - compareOp, comparator, put, result); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + throws IOException { + return execOperationWithResult(true, new RegionOperationWithResult(false) { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + setResult(oserver.preCheckAndPut(ctx, row, family, qualifier, + compareOp, comparator, put, getResult())); } - } - return bypass ? result : null; + }); } /** @@ -1505,32 +938,14 @@ public class RegionCoprocessorHost public Boolean preCheckAndPutAfterRowLock(final byte[] row, final byte[] family, final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator, final Put put) throws IOException { - boolean bypass = false; - boolean result = false; - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - result = ((RegionObserver) env.getInstance()).preCheckAndPutAfterRowLock(ctx, row, - family, qualifier, compareOp, comparator, put, result); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(true, new RegionOperationWithResult(false) { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + setResult(oserver.preCheckAndPutAfterRowLock(ctx, row, family, qualifier, + compareOp, comparator, put, getResult())); } - } - return bypass ? result : null; + }); } /** @@ -1545,31 +960,15 @@ public class RegionCoprocessorHost public boolean postCheckAndPut(final byte [] row, final byte [] family, final byte [] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator, final Put put, - boolean result) - throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - result = ((RegionObserver)env.getInstance()).postCheckAndPut(ctx, row, family, - qualifier, compareOp, comparator, put, result); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + boolean result) throws IOException { + return execOperationWithResult(new RegionOperationWithResult(result) { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + setResult(oserver.postCheckAndPut(ctx, row, family, qualifier, + compareOp, comparator, put, getResult())); } - } - return result; + }); } /** @@ -1587,32 +986,14 @@ public class RegionCoprocessorHost final byte [] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator, final Delete delete) throws IOException { - boolean bypass = false; - boolean result = false; - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - result = ((RegionObserver)env.getInstance()).preCheckAndDelete(ctx, row, family, - qualifier, compareOp, comparator, delete, result); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(true, new RegionOperationWithResult(false) { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + setResult(oserver.preCheckAndDelete(ctx, row, family, + qualifier, compareOp, comparator, delete, getResult())); } - } - return bypass ? result : null; + }); } /** @@ -1629,32 +1010,14 @@ public class RegionCoprocessorHost public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family, final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator, final Delete delete) throws IOException { - boolean bypass = false; - boolean result = false; - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - result = ((RegionObserver) env.getInstance()).preCheckAndDeleteAfterRowLock(ctx, row, - family, qualifier, compareOp, comparator, delete, result); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(true, new RegionOperationWithResult(false) { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + setResult(oserver.preCheckAndDeleteAfterRowLock(ctx, row, + family, qualifier, compareOp, comparator, delete, getResult())); } - } - return bypass ? result : null; + }); } /** @@ -1670,29 +1033,14 @@ public class RegionCoprocessorHost final byte [] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator, final Delete delete, boolean result) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - result = ((RegionObserver)env.getInstance()).postCheckAndDelete(ctx, row, family, - qualifier, compareOp, comparator, delete, result); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(new RegionOperationWithResult(result) { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + setResult(oserver.postCheckAndDelete(ctx, row, family, + qualifier, compareOp, comparator, delete, getResult())); } - } - return result; + }); } /** @@ -1702,31 +1050,13 @@ public class RegionCoprocessorHost * @throws IOException if an error occurred on the coprocessor */ public Result preAppend(final Append append) throws IOException { - boolean bypass = false; - Result result = null; - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - result = ((RegionObserver)env.getInstance()).preAppend(ctx, append); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(true, new RegionOperationWithResult() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + setResult(oserver.preAppend(ctx, append)); } - } - return bypass ? result : null; + }); } /** @@ -1736,31 +1066,13 @@ public class RegionCoprocessorHost * @throws IOException if an error occurred on the coprocessor */ public Result preAppendAfterRowLock(final Append append) throws IOException { - boolean bypass = false; - Result result = null; - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - result = ((RegionObserver) env.getInstance()).preAppendAfterRowLock(ctx, append); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(true, new RegionOperationWithResult() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + setResult(oserver.preAppendAfterRowLock(ctx, append)); } - } - return bypass ? result : null; + }); } /** @@ -1770,31 +1082,13 @@ public class RegionCoprocessorHost * @throws IOException if an error occurred on the coprocessor */ public Result preIncrement(final Increment increment) throws IOException { - boolean bypass = false; - Result result = null; - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - result = ((RegionObserver)env.getInstance()).preIncrement(ctx, increment); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(true, new RegionOperationWithResult() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + setResult(oserver.preIncrement(ctx, increment)); } - } - return bypass ? result : null; + }); } /** @@ -1804,31 +1098,13 @@ public class RegionCoprocessorHost * @throws IOException if an error occurred on the coprocessor */ public Result preIncrementAfterRowLock(final Increment increment) throws IOException { - boolean bypass = false; - Result result = null; - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - result = ((RegionObserver) env.getInstance()).preIncrementAfterRowLock(ctx, increment); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(true, new RegionOperationWithResult() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + setResult(oserver.preIncrementAfterRowLock(ctx, increment)); } - } - return bypass ? result : null; + }); } /** @@ -1837,27 +1113,13 @@ public class RegionCoprocessorHost * @throws IOException if an error occurred on the coprocessor */ public void postAppend(final Append append, final Result result) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).postAppend(ctx, append, result); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postAppend(ctx, append, result); } - } + }); } /** @@ -1866,28 +1128,13 @@ public class RegionCoprocessorHost * @throws IOException if an error occurred on the coprocessor */ public Result postIncrement(final Increment increment, Result result) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - result = ((RegionObserver)env.getInstance()).postIncrement(ctx, increment, result); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(new RegionOperationWithResult(result) { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + setResult(oserver.postIncrement(ctx, increment, getResult())); } - } - return result; + }); } /** @@ -1897,31 +1144,13 @@ public class RegionCoprocessorHost * @exception IOException Exception */ public RegionScanner preScannerOpen(final Scan scan) throws IOException { - boolean bypass = false; - RegionScanner s = null; - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - s = ((RegionObserver)env.getInstance()).preScannerOpen(ctx, scan, s); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(true, new RegionOperationWithResult() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + setResult(oserver.preScannerOpen(ctx, scan, getResult())); } - } - return bypass ? s : null; + }); } /** @@ -1931,30 +1160,13 @@ public class RegionCoprocessorHost */ public KeyValueScanner preStoreScannerOpen(final Store store, final Scan scan, final NavigableSet targetCols) throws IOException { - KeyValueScanner s = null; - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - s = ((RegionObserver) env.getInstance()).preStoreScannerOpen(ctx, store, scan, - targetCols, s); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(new RegionOperationWithResult() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + setResult(oserver.preStoreScannerOpen(ctx, store, scan, targetCols, getResult())); } - } - return s; + }); } /** @@ -1964,28 +1176,13 @@ public class RegionCoprocessorHost * @exception IOException Exception */ public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - s = ((RegionObserver)env.getInstance()).postScannerOpen(ctx, scan, s); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(new RegionOperationWithResult(s) { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + setResult(oserver.postScannerOpen(ctx, scan, getResult())); } - } - return s; + }); } /** @@ -1998,32 +1195,13 @@ public class RegionCoprocessorHost */ public Boolean preScannerNext(final InternalScanner s, final List results, final int limit) throws IOException { - boolean bypass = false; - boolean hasNext = false; - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - hasNext = ((RegionObserver)env.getInstance()).preScannerNext(ctx, s, results, limit, - hasNext); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(true, new RegionOperationWithResult(false) { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + setResult(oserver.preScannerNext(ctx, s, results, limit, getResult())); } - } - - return bypass ? hasNext : null; + }); } /** @@ -2037,29 +1215,13 @@ public class RegionCoprocessorHost public boolean postScannerNext(final InternalScanner s, final List results, final int limit, boolean hasMore) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - hasMore = ((RegionObserver)env.getInstance()).postScannerNext(ctx, s, results, limit, - hasMore); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(new RegionOperationWithResult(hasMore) { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + setResult(oserver.postScannerNext(ctx, s, results, limit, getResult())); } - } - return hasMore; + }); } /** @@ -2074,90 +1236,41 @@ public class RegionCoprocessorHost */ public boolean postScannerFilterRow(final InternalScanner s, final byte[] currentRow, final int offset, final short length) throws IOException { - boolean hasMore = true; // By default assume more rows there. - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - hasMore = ((RegionObserver) env.getInstance()).postScannerFilterRow(ctx, s, currentRow, - offset, length, hasMore); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(new RegionOperationWithResult(true) { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + setResult(oserver.postScannerFilterRow(ctx, s, currentRow, offset,length, getResult())); } - } - return hasMore; + }); } - + /** * @param s the scanner * @return true if default behavior should be bypassed, false otherwise * @exception IOException Exception */ public boolean preScannerClose(final InternalScanner s) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).preScannerClose(ctx, s); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preScannerClose(ctx, s); } - } - - return bypass; + }); } /** * @exception IOException Exception */ public void postScannerClose(final InternalScanner s) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).postScannerClose(ctx, s); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postScannerClose(ctx, s); } - } + }); } /** @@ -2169,30 +1282,13 @@ public class RegionCoprocessorHost */ public boolean preWALRestore(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).preWALRestore(ctx, info, logKey, logEdit); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preWALRestore(ctx, info, logKey, logEdit); } - } - return bypass; + }); } /** @@ -2203,27 +1299,13 @@ public class RegionCoprocessorHost */ public void postWALRestore(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).postWALRestore(ctx, info, logKey, logEdit); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postWALRestore(ctx, info, logKey, logEdit); } - } + }); } /** @@ -2232,30 +1314,13 @@ public class RegionCoprocessorHost * @throws IOException */ public boolean preBulkLoadHFile(final List> familyPaths) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).preBulkLoadHFile(ctx, familyPaths); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preBulkLoadHFile(ctx, familyPaths); } - } - return bypass; + }); } /** @@ -2266,77 +1331,33 @@ public class RegionCoprocessorHost */ public boolean postBulkLoadHFile(final List> familyPaths, boolean hasLoaded) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - hasLoaded = ((RegionObserver)env.getInstance()).postBulkLoadHFile(ctx, familyPaths, - hasLoaded); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(new RegionOperationWithResult(hasLoaded) { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + setResult(oserver.postBulkLoadHFile(ctx, familyPaths, getResult())); } - } - return hasLoaded; + }); } public void postStartRegionOperation(final Operation op) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).postStartRegionOperation(ctx, op); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postStartRegionOperation(ctx, op); } - } + }); } public void postCloseRegionOperation(final Operation op) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).postCloseRegionOperation(ctx, op); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postCloseRegionOperation(ctx, op); } - } + }); } /** @@ -2353,30 +1374,13 @@ public class RegionCoprocessorHost public StoreFile.Reader preStoreFileReaderOpen(final FileSystem fs, final Path p, final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf, final Reference r) throws IOException { - StoreFile.Reader reader = null; - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - reader = ((RegionObserver) env.getInstance()).preStoreFileReaderOpen(ctx, fs, p, in, - size, cacheConf, r, reader); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(new RegionOperationWithResult() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + setResult(oserver.preStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult())); } - } - return reader; + }); } /** @@ -2392,154 +1396,185 @@ public class RegionCoprocessorHost */ public StoreFile.Reader postStoreFileReaderOpen(final FileSystem fs, final Path p, final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf, - final Reference r, StoreFile.Reader reader) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - reader = ((RegionObserver) env.getInstance()).postStoreFileReaderOpen(ctx, fs, p, in, - size, cacheConf, r, reader); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + final Reference r, final StoreFile.Reader reader) throws IOException { + return execOperationWithResult(new RegionOperationWithResult(reader) { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + setResult(oserver.postStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult())); } - } - return reader; + }); } public Cell postMutationBeforeWAL(final MutationType opType, final Mutation mutation, final Cell oldCell, Cell newCell) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - newCell = ((RegionObserver) env.getInstance()).postMutationBeforeWAL(ctx, opType, - mutation, oldCell, newCell); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(new RegionOperationWithResult(newCell) { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + setResult(oserver.postMutationBeforeWAL(ctx, opType, mutation, oldCell, getResult())); } - } - return newCell; + }); } public Message preEndpointInvocation(final Service service, final String methodName, Message request) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof EndpointObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - request = ((EndpointObserver) env.getInstance()).preEndpointInvocation(ctx, service, - methodName, request); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(new EndpointOperationWithResult(request) { + @Override + public void call(EndpointObserver oserver, ObserverContext ctx) + throws IOException { + setResult(oserver.preEndpointInvocation(ctx, service, methodName, getResult())); } - } - return request; + }); } public void postEndpointInvocation(final Service service, final String methodName, final Message request, final Message.Builder responseBuilder) throws IOException { - ObserverContext ctx = null; + execOperation(new EndpointOperation() { + @Override + public void call(EndpointObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postEndpointInvocation(ctx, service, methodName, request, responseBuilder); + } + }); + } + + public DeleteTracker postInstantiateDeleteTracker(DeleteTracker tracker) throws IOException { + return execOperationWithResult(new RegionOperationWithResult(tracker) { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + setResult(oserver.postInstantiateDeleteTracker(ctx, getResult())); + } + }); + } + + public Map getCoprocessorExecutionStatistics() { + Map results = new HashMap(); for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof EndpointObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((EndpointObserver) env.getInstance()).postEndpointInvocation(ctx, service, - methodName, request, responseBuilder); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); + DescriptiveStatistics ds = new DescriptiveStatistics(); + if (env.getInstance() instanceof RegionObserver) { + for (Long time : env.getExecutionLatenciesNanos()) { + ds.addValue(time); } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; + // Ensures that web ui circumvents the display of NaN values when there are zero samples. + if (ds.getN() == 0) { + ds.addValue(0); } + results.put(env.getInstance().getClass().getSimpleName(), ds); } } + return results; + } + private static abstract class CoprocessorOperation + extends ObserverContext { + public abstract void call(Coprocessor observer, + ObserverContext ctx) throws IOException; + public abstract boolean hasCall(Coprocessor observer); + public void postEnvCall(RegionEnvironment env) { } } - public DeleteTracker postInstantiateDeleteTracker(DeleteTracker tracker) throws IOException { - ObserverContext ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { + private static abstract class RegionOperation extends CoprocessorOperation { + public abstract void call(RegionObserver observer, + ObserverContext ctx) throws IOException; + + public boolean hasCall(Coprocessor observer) { + return observer instanceof RegionObserver; + } + + public void call(Coprocessor observer, ObserverContext ctx) + throws IOException { + call((RegionObserver)observer, ctx); + } + } + + private static abstract class RegionOperationWithResult extends RegionOperation { + private T result = null; + public RegionOperationWithResult() { this(null); } + public RegionOperationWithResult(final T result) { this.result = result; } + public void setResult(final T result) { this.result = result; } + public T getResult() { return this.result; } + } + + private static abstract class EndpointOperation extends CoprocessorOperation { + public abstract void call(EndpointObserver observer, + ObserverContext ctx) throws IOException; + + public boolean hasCall(Coprocessor observer) { + return observer instanceof EndpointObserver; + } + + public void call(Coprocessor observer, ObserverContext ctx) + throws IOException { + call((EndpointObserver)observer, ctx); + } + } + + private static abstract class EndpointOperationWithResult extends EndpointOperation { + private T result = null; + public EndpointOperationWithResult() { this(null); } + public EndpointOperationWithResult(final T result) { this.result = result; } + public void setResult(final T result) { this.result = result; } + public T getResult() { return this.result; } + } + + private boolean execOperation(final CoprocessorOperation ctx) + throws IOException { + return execOperation(true, ctx); + } + + private T execOperationWithResult(final RegionOperationWithResult ctx) + throws IOException { + execOperation(true, ctx); + return ctx.getResult(); + } + + private T execOperationWithResult(final boolean ifBypass, + final RegionOperationWithResult ctx) throws IOException { + boolean bypass = execOperation(true, ctx); + return bypass == ifBypass ? ctx.getResult() : null; + } + + private T execOperationWithResult(final EndpointOperationWithResult ctx) + throws IOException { + execOperation(true, ctx); + return ctx.getResult(); + } + + private T execOperationWithResult(final boolean ifBypass, + final EndpointOperationWithResult ctx) throws IOException { + boolean bypass = execOperation(true, ctx); + return bypass == ifBypass ? ctx.getResult() : null; + } + + private boolean execOperation(final boolean earlyExit, final CoprocessorOperation ctx) + throws IOException { + boolean bypass = false; + for (RegionEnvironment env: coprocessors) { + Coprocessor observer = env.getInstance(); + if (ctx.hasCall(observer)) { long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); + ctx.prepare(env); Thread currentThread = Thread.currentThread(); ClassLoader cl = currentThread.getContextClassLoader(); try { currentThread.setContextClassLoader(env.getClassLoader()); - tracker = ((RegionObserver) env.getInstance()).postInstantiateDeleteTracker(ctx, - tracker); + ctx.call(observer, ctx); } catch (Throwable e) { handleCoprocessorThrowable(env, e); } finally { currentThread.setContextClassLoader(cl); } env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { + bypass |= ctx.shouldBypass(); + if (earlyExit && ctx.shouldComplete()) { break; } } - } - return tracker; - } - public Map getCoprocessorExecutionStatistics() { - Map results = new HashMap(); - for (RegionEnvironment env : coprocessors) { - DescriptiveStatistics ds = new DescriptiveStatistics(); - if (env.getInstance() instanceof RegionObserver) { - for (Long time : env.getExecutionLatenciesNanos()) { - ds.addValue(time); - } - // Ensures that web ui circumvents the display of NaN values when there are zero samples. - if (ds.getN() == 0) { - ds.addValue(0); - } - results.put(env.getInstance().getClass().getSimpleName(), ds); - } + ctx.postEnvCall(env); } - return results; + return bypass; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java index 5052f2a..c20f9a8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java @@ -54,170 +54,118 @@ public class RegionServerCoprocessorHost extends } public void preStop(String message) throws IOException { - ObserverContext ctx = null; - for (RegionServerEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionServerObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionServerObserver) env.getInstance()).preStopRegionServer(ctx); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(RegionServerObserver oserver, + ObserverContext ctx) throws IOException { + oserver.preStopRegionServer(ctx); } - // invoke coprocessor stop method - shutdown(env); - } + @Override + public void postEnvCall(RegionServerEnvironment env) { + // invoke coprocessor stop method + shutdown(env); + } + }); } public boolean preMerge(final HRegion regionA, final HRegion regionB) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (RegionServerEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionServerObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionServerObserver) env.getInstance()).preMerge(ctx, regionA, regionB); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new CoprocessorOperation() { + @Override + public void call(RegionServerObserver oserver, + ObserverContext ctx) throws IOException { + oserver.preMerge(ctx, regionA, regionB); } - } - return bypass; + }); } public void postMerge(final HRegion regionA, final HRegion regionB, final HRegion mergedRegion) throws IOException { - ObserverContext ctx = null; - for (RegionServerEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionServerObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionServerObserver) env.getInstance()).postMerge(ctx, regionA, regionB, mergedRegion); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(RegionServerObserver oserver, + ObserverContext ctx) throws IOException { + oserver.postMerge(ctx, regionA, regionB, mergedRegion); } - } + }); } public boolean preMergeCommit(final HRegion regionA, final HRegion regionB, final @MetaMutationAnnotation List metaEntries) throws IOException { - boolean bypass = false; - ObserverContext ctx = null; - for (RegionServerEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionServerObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionServerObserver) env.getInstance()).preMergeCommit(ctx, regionA, regionB, - metaEntries); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - if (ctx.shouldComplete()) { - break; - } + return execOperation(new CoprocessorOperation() { + @Override + public void call(RegionServerObserver oserver, + ObserverContext ctx) throws IOException { + oserver.preMergeCommit(ctx, regionA, regionB, metaEntries); } - } - return bypass; + }); } public void postMergeCommit(final HRegion regionA, final HRegion regionB, final HRegion mergedRegion) throws IOException { - ObserverContext ctx = null; - for (RegionServerEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionServerObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionServerObserver) env.getInstance()).postMergeCommit(ctx, regionA, regionB, - mergedRegion); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(RegionServerObserver oserver, + ObserverContext ctx) throws IOException { + oserver.postMergeCommit(ctx, regionA, regionB, mergedRegion); } - } + }); } public void preRollBackMerge(final HRegion regionA, final HRegion regionB) throws IOException { - ObserverContext ctx = null; - for (RegionServerEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionServerObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionServerObserver) env.getInstance()).preRollBackMerge(ctx, regionA, regionB); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(new CoprocessorOperation() { + @Override + public void call(RegionServerObserver oserver, + ObserverContext ctx) throws IOException { + oserver.preRollBackMerge(ctx, regionA, regionB); } - } + }); } public void postRollBackMerge(final HRegion regionA, final HRegion regionB) throws IOException { - ObserverContext ctx = null; - for (RegionServerEnvironment env : coprocessors) { + execOperation(new CoprocessorOperation() { + @Override + public void call(RegionServerObserver oserver, + ObserverContext ctx) throws IOException { + oserver.postRollBackMerge(ctx, regionA, regionB); + } + }); + } + + private static abstract class CoprocessorOperation + extends ObserverContext { + public CoprocessorOperation() { + } + + public abstract void call(RegionServerObserver oserver, + ObserverContext ctx) throws IOException; + + public void postEnvCall(RegionServerEnvironment env) { + } + } + + private boolean execOperation(final CoprocessorOperation ctx) throws IOException { + boolean bypass = false; + for (RegionServerEnvironment env: coprocessors) { if (env.getInstance() instanceof RegionServerObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); + ctx.prepare(env); Thread currentThread = Thread.currentThread(); ClassLoader cl = currentThread.getContextClassLoader(); try { currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionServerObserver) env.getInstance()).postRollBackMerge(ctx, regionA, regionB); + ctx.call((RegionServerObserver)env.getInstance(), ctx); } catch (Throwable e) { handleCoprocessorThrowable(env, e); } finally { currentThread.setContextClassLoader(cl); } + bypass |= ctx.shouldBypass(); if (ctx.shouldComplete()) { break; } } + ctx.postEnvCall(env); } + return bypass; } /**