diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index 5febcc8daa..a53ead411a 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -31,6 +31,7 @@ import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.WRI import com.google.errorprone.annotations.RestrictedApi; import java.io.IOException; +import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; @@ -364,7 +365,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { this.clientName = clientName; this.src = src; this.block = locatedBlock.getBlock(); - this.locations = locatedBlock.getLocations(); + this.locations = getLocations(locatedBlock); this.encryptor = encryptor; this.datanodeInfoMap = datanodeInfoMap; this.summer = summer; @@ -376,6 +377,20 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { this.streamSlowMonitor = streamSlowMonitor; } + private DatanodeInfo[] getLocations(LocatedBlock locatedBlock) { + DatanodeInfo[] datanodeInfos = null; + Method getLocationMethod; + Class clazz; + try { + clazz = Class.forName("org.apache.hadoop.hdfs.protocol.LocatedBlock"); + getLocationMethod = clazz.getMethod("getLocations"); + datanodeInfos = (DatanodeInfo[])getLocationMethod.invoke(locatedBlock); + } catch (Exception e) { + e.printStackTrace(); + } + return datanodeInfos; + } + @Override public void writeInt(int i) { buf.ensureWritable(4); diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java index 04b1370145..e27e1b6558 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java @@ -441,7 +441,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { BlockConstructionStage stage, DataChecksum summer, EventLoopGroup eventLoopGroup, Class channelClass) { StorageType[] storageTypes = locatedBlock.getStorageTypes(); - DatanodeInfo[] datanodeInfos = locatedBlock.getLocations(); + DatanodeInfo[] datanodeInfos = getLocations(locatedBlock); boolean connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT); @@ -555,7 +555,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L, PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass); for (int i = 0, n = futureList.size(); i < n; i++) { - DatanodeInfo datanodeInfo = locatedBlock.getLocations()[i]; + DatanodeInfo datanodeInfo = getLocations(locatedBlock)[i]; try { datanodes.put(futureList.get(i).syncUninterruptibly().getNow(), datanodeInfo); } catch (Exception e) { @@ -613,6 +613,20 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { } } + private static DatanodeInfo[] getLocations(LocatedBlock locatedBlock) { + DatanodeInfo[] datanodeInfos = null; + Method getLocationMethod; + Class clazz; + try { + clazz = Class.forName("org.apache.hadoop.hdfs.protocol.LocatedBlock"); + getLocationMethod = clazz.getMethod("getLocations"); + datanodeInfos = (DatanodeInfo[])getLocationMethod.invoke(locatedBlock); + } catch (Exception e) { + e.printStackTrace(); + } + return datanodeInfos; + } + /** * Create a {@link FanOutOneBlockAsyncDFSOutput}. The method maybe blocked so do not call it * inside an {@link EventLoop}.