Hadoop HDFS
  1. Hadoop HDFS
  2. HDFS-232

Cross-system causal tracing within Hadoop

    Details

    • Type: New Feature New Feature
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      Much of Hadoop's behavior is client-driven, with clients responsible for contacting individual datanodes to read and write data, as well as dividing up work for map and reduce tasks. In a large deployment with many concurrent users, identifying the effects of individual clients on the infrastructure is a challenge. The use of data pipelining in HDFS and Map/Reduce make it hard to follow the effects of a given client request through the system.

      This proposal is to instrument the HDFS, IPC, and Map/Reduce layers of Hadoop with X-Trace. X-Trace is an open-source framework for capturing causality of events in a distributed system. It can correlate operations making up a single user request, even if those operations span multiple machines. As an example, you could use X-Trace to follow an HDFS write operation as it is pipelined through intermediate nodes. Additionally, you could trace a single Map/Reduce job and see how it is decomposed into lower-layer HDFS operations.

      Matei Zaharia and Andy Konwinski initially integrated X-Trace with a local copy of the 0.14 release, and I've brought that code up to release 0.17. Performing the integration involves modifying the IPC protocol, inter-datanode protocol, and some data structures in the map/reduce layer to include 20-byte long tracing metadata. With release 0.18, the generated traces could be collected with Chukwa.

      I've attached some example traces of HDFS and IPC layers from the 0.17 patch to this JIRA issue.

      More information about X-Trace is available from http://www.x-trace.net/ as well as in a paper that appeared at NSDI 2007, available online at http://www.usenix.org/events/nsdi07/tech/fonseca.html

      1. HADOOP-4049.2-ipc.patch
        33 kB
        George Porter
      2. HADOOP-4049.3-ipc.patch
        33 kB
        George Porter
      3. HADOOP-4049.4-rpc.patch
        29 kB
        George Porter
      4. HADOOP-4049.6-rpc.patch
        25 kB
        George Porter
      5. HADOOP-4049.7-rpc.patch
        26 kB
        George Porter
      6. HADOOP-4049.patch
        29 kB
        George Porter
      7. multiblockread.png
        29 kB
        George Porter
      8. multiblockwrite.png
        83 kB
        George Porter

        Activity

        Hide
        Tom White added a comment -

        Unfortunately this patch doesn't apply to the latest trunk anymore, and needs regenerating.

        Show
        Tom White added a comment - Unfortunately this patch doesn't apply to the latest trunk anymore, and needs regenerating.
        Hide
        Yinzhi Cao added a comment -

        There is always errors when I try to use the patch. I tried 0.18.0, 0.18.1 and 0.17.0. Can anyone tell me the reason? Thanks.

        Show
        Yinzhi Cao added a comment - There is always errors when I try to use the patch. I tried 0.18.0, 0.18.1 and 0.17.0. Can anyone tell me the reason? Thanks.
        Hide
        Hadoop QA added a comment -

        -1 overall. Here are the results of testing the latest attachment
        http://issues.apache.org/jira/secure/attachment/12391172/HADOOP-4049.7-rpc.patch
        against trunk revision 700628.

        +1 @author. The patch does not contain any @author tags.

        +1 tests included. The patch appears to include 9 new or modified tests.

        +1 javadoc. The javadoc tool did not generate any warning messages.

        +1 javac. The applied patch does not increase the total number of javac compiler warnings.

        -1 findbugs. The patch appears to introduce 2 new Findbugs warnings.

        +1 Eclipse classpath. The patch retains Eclipse classpath integrity.

        -1 core tests. The patch failed core unit tests.

        +1 contrib tests. The patch passed contrib unit tests.

        Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3410/testReport/
        Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3410/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
        Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3410/artifact/trunk/build/test/checkstyle-errors.html
        Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3410/console

        This message is automatically generated.

        Show
        Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12391172/HADOOP-4049.7-rpc.patch against trunk revision 700628. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 9 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. -1 findbugs. The patch appears to introduce 2 new Findbugs warnings. +1 Eclipse classpath. The patch retains Eclipse classpath integrity. -1 core tests. The patch failed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3410/testReport/ Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3410/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3410/artifact/trunk/build/test/checkstyle-errors.html Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3410/console This message is automatically generated.
        Hide
        Doug Cutting added a comment -

        +1 This looks good to me. Have you run any RPC-intensive benchmarks?

        Show
        Doug Cutting added a comment - +1 This looks good to me. Have you run any RPC-intensive benchmarks?
        Hide
        George Porter added a comment -

        This patch includes:

        • Each RPC Invocation has a CURRENT_VERSION field, which is set and read from write() and readFields(). An IOException is thrown if the two ends of the RPC exchange have different versions
        • NullRPCInstrumentation has been removed
        • RPCInstrumentation has been changed from an abstract class to a concrete class, and its 6 instrumentation API methods now have 'no-op' default behaviors.
        Show
        George Porter added a comment - This patch includes: Each RPC Invocation has a CURRENT_VERSION field, which is set and read from write() and readFields(). An IOException is thrown if the two ends of the RPC exchange have different versions NullRPCInstrumentation has been removed RPCInstrumentation has been changed from an abstract class to a concrete class, and its 6 instrumentation API methods now have 'no-op' default behaviors.
        Hide
        Doug Cutting added a comment -

        This looks much cleaner.

        We need to detect incompatibility. We currently version the transport (Client/Server) and application (protocol) layers, but this changes the RPC layer, between those, for which we do not currently have a version checking mechanism. Newer clients attempting to talk to older servers and vice versa will fail in unexpected ways.

        The best way I can think to fix this is to increment the transport version (Server#CURRENT_VERSION, even though that layer has not changed), and add an RPC-layer version field in Invocation, so that if we ever change the RPC layer again we'll be able to. This can just be a single-byte field in Invocation that write() sends and readFields() checks against a constant value. Then, should we ever change Invocation and/or RPCResponse again we'll be able to detect it. Does that make sense?

        Finally, is NullRPCInstrumentation needed? You seem to accept null as a value for an instrumentation, so I don't see why this class is needed. If we do need such a class, then it would be shorter to replace RPCInstrumentation's abstract methods with {}. Less code is almost always better with me!

        Show
        Doug Cutting added a comment - This looks much cleaner. We need to detect incompatibility. We currently version the transport (Client/Server) and application (protocol) layers, but this changes the RPC layer, between those, for which we do not currently have a version checking mechanism. Newer clients attempting to talk to older servers and vice versa will fail in unexpected ways. The best way I can think to fix this is to increment the transport version (Server#CURRENT_VERSION, even though that layer has not changed), and add an RPC-layer version field in Invocation, so that if we ever change the RPC layer again we'll be able to. This can just be a single-byte field in Invocation that write() sends and readFields() checks against a constant value. Then, should we ever change Invocation and/or RPCResponse again we'll be able to detect it. Does that make sense? Finally, is NullRPCInstrumentation needed? You seem to accept null as a value for an instrumentation, so I don't see why this class is needed. If we do need such a class, then it would be shorter to replace RPCInstrumentation's abstract methods with {}. Less code is almost always better with me!
        Hide
        George Porter added a comment -

        This patch includes:

        • The RPC layer now instantiates an ipc.Client of type RPCResponse, which is a subclass of ObjectWritable. This return type optionally contains path state
        • When RPC instrumentation is disabled, the overhead is significantly less than before, since no objects are created.
        • There are no longer any ThreadLocal variables
        Show
        George Porter added a comment - This patch includes: The RPC layer now instantiates an ipc.Client of type RPCResponse, which is a subclass of ObjectWritable. This return type optionally contains path state When RPC instrumentation is disabled, the overhead is significantly less than before, since no objects are created. There are no longer any ThreadLocal variables
        Hide
        Doug Cutting added a comment -

        We should be able to do this without replicating the logic of ObjectWritable#writeObject and #readObject. Why can't you just invoke super.writeObject() and super.readObject() from the implementations? If your ObjectWritable subclass was called RPCResponse, and RPC's always returned instances of this, regardless of whether instrumentation is enabled, and it has a pathState field, but the pathState might be null, would that make things simpler?

        Show
        Doug Cutting added a comment - We should be able to do this without replicating the logic of ObjectWritable#writeObject and #readObject. Why can't you just invoke super.writeObject() and super.readObject() from the implementations? If your ObjectWritable subclass was called RPCResponse, and RPC's always returned instances of this, regardless of whether instrumentation is enabled, and it has a pathState field, but the pathState might be null, would that make things simpler?
        Hide
        George Porter added a comment -

        This patch incorporates Doug's comments, and:

        • leaves ipc.Client and ipc.Server alone
        • is restricted to instrumentation within RPC.java
        • returns path state back to the Caller by returning a subclass of ObjectWritable with associated path state
        • includes more comprehensive testing code
        Show
        George Porter added a comment - This patch incorporates Doug's comments, and: leaves ipc.Client and ipc.Server alone is restricted to instrumentation within RPC.java returns path state back to the Caller by returning a subclass of ObjectWritable with associated path state includes more comprehensive testing code
        Hide
        Doug Cutting added a comment -

        Returning a two-element array is nasty. Why not define a two-field class? Moreover, I don't think Client.java nor Server.java need be altered. On the client side, Invocation can be extended to have a pathState, and an ObjectWritable subclass can be defined with a pathState for the return value. RPC.Server#call can be extended to call instrumentation, in about the same places where logging is done there currently. Does that make sense?

        Show
        Doug Cutting added a comment - Returning a two-element array is nasty. Why not define a two-field class? Moreover, I don't think Client.java nor Server.java need be altered. On the client side, Invocation can be extended to have a pathState, and an ObjectWritable subclass can be defined with a pathState for the return value. RPC.Server#call can be extended to call instrumentation, in about the same places where logging is done there currently. Does that make sense?
        Hide
        Hadoop QA added a comment -

        -1 overall. Here are the results of testing the latest attachment
        http://issues.apache.org/jira/secure/attachment/12390675/HADOOP-4049.3-ipc.patch
        against trunk revision 697306.

        +1 @author. The patch does not contain any @author tags.

        +1 tests included. The patch appears to include 6 new or modified tests.

        +1 javadoc. The javadoc tool did not generate any warning messages.

        +1 javac. The applied patch does not increase the total number of javac compiler warnings.

        +1 findbugs. The patch does not introduce any new Findbugs warnings.

        -1 core tests. The patch failed core unit tests.

        -1 contrib tests. The patch failed contrib unit tests.

        Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3348/testReport/
        Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3348/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
        Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3348/artifact/trunk/build/test/checkstyle-errors.html
        Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3348/console

        This message is automatically generated.

        Show
        Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12390675/HADOOP-4049.3-ipc.patch against trunk revision 697306. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 6 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. +1 findbugs. The patch does not introduce any new Findbugs warnings. -1 core tests. The patch failed core unit tests. -1 contrib tests. The patch failed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3348/testReport/ Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3348/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3348/artifact/trunk/build/test/checkstyle-errors.html Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3348/console This message is automatically generated.
        Hide
        George Porter added a comment -

        Getting the Jira back to a consistent state

        Show
        George Porter added a comment - Getting the Jira back to a consistent state
        Hide
        George Porter added a comment -

        This patch incorporates feedback from the online discussion, resulting in:

        • 6 instrumentation points (4 for normal calls, 2 for exceptional conditions)
        • Support for those points setting and querying path-state kept along the path
        • wrapping the IPC path state into an object with its own type
        • More cleanly separating the RPC layer from the IPC layer. The IPC call() method has been extended to include per-call state.
        • Test cases
        Show
        George Porter added a comment - This patch incorporates feedback from the online discussion, resulting in: 6 instrumentation points (4 for normal calls, 2 for exceptional conditions) Support for those points setting and querying path-state kept along the path wrapping the IPC path state into an object with its own type More cleanly separating the RPC layer from the IPC layer. The IPC call() method has been extended to include per-call state. Test cases
        Hide
        George Porter added a comment -

        Wrong patch version submitted. Sigh.

        Show
        George Porter added a comment - Wrong patch version submitted. Sigh.
        Hide
        George Porter added a comment -

        This patch incorporates feedback from the online discussion, resulting in:

        • 6 instrumentation points (4 for normal calls, 2 for exceptional conditions)
        • Support for those points setting and querying path-state kept along the path
        • wrapping the IPC path state into an object with its own type
        • More cleanly separating the RPC layer from the IPC layer. The IPC call() method has been extended to include per-call state.
        • Test cases
        Show
        George Porter added a comment - This patch incorporates feedback from the online discussion, resulting in: 6 instrumentation points (4 for normal calls, 2 for exceptional conditions) Support for those points setting and querying path-state kept along the path wrapping the IPC path state into an object with its own type More cleanly separating the RPC layer from the IPC layer. The IPC call() method has been extended to include per-call state. Test cases
        Hide
        George Porter added a comment -

        [This updated version of the patch fixes a failing JUnit test--sorry for the inconvenience]

        This patch addresses the comments that have been suggested here, including:

        • wrapping the IPC path state into an object with its own type
        • caching a reference to the instrumentation into local variables
        • various fixes such as javadocs and indentation

        I also tried to more cleanly separate the RPC layer from the IPC layer. Because a given IPC client can be shared by multiple threads, I set it up so that you can pass a path state object into the Client.call() method. Once the call is complete, modified headers are returned back to the RPC layer. This associates path state with a particular IPC call, while at the same time no longer requiring the IPC code to reach up into the RPC layer. Feedback is appreciated.

        Show
        George Porter added a comment - [This updated version of the patch fixes a failing JUnit test--sorry for the inconvenience] This patch addresses the comments that have been suggested here, including: wrapping the IPC path state into an object with its own type caching a reference to the instrumentation into local variables various fixes such as javadocs and indentation I also tried to more cleanly separate the RPC layer from the IPC layer. Because a given IPC client can be shared by multiple threads, I set it up so that you can pass a path state object into the Client.call() method. Once the call is complete, modified headers are returned back to the RPC layer. This associates path state with a particular IPC call, while at the same time no longer requiring the IPC code to reach up into the RPC layer. Feedback is appreciated.
        Hide
        George Porter added a comment -

        This patch addresses the comments that have been suggested here, including:

        • wrapping the IPC path state into an object with its own type
        • caching a reference to the instrumentation into local variables
        • various fixes such as javadocs and indentation

        I also tried to more cleanly separate the RPC layer from the IPC layer. Because a given IPC client can be shared by multiple threads, I set it up so that you can pass a path state object into the Client.call() method. Once the call is complete, modified headers are returned back to the RPC layer. This associates path state with a particular IPC call, while at the same time no longer requiring the IPC code to reach up into the RPC layer. Feedback is appreciated.

        Next up: the HDFS layer.

        Show
        George Porter added a comment - This patch addresses the comments that have been suggested here, including: wrapping the IPC path state into an object with its own type caching a reference to the instrumentation into local variables various fixes such as javadocs and indentation I also tried to more cleanly separate the RPC layer from the IPC layer. Because a given IPC client can be shared by multiple threads, I set it up so that you can pass a path state object into the Client.call() method. Once the call is complete, modified headers are returned back to the RPC layer. This associates path state with a particular IPC call, while at the same time no longer requiring the IPC code to reach up into the RPC layer. Feedback is appreciated. Next up: the HDFS layer.
        Hide
        George Porter added a comment -

        Thank you for the feedback. I'm implementing it now, but wanted to bring up an issue related to making Client and Server more extensible.

        I like the idea of providing an addHeader() and setHeader() interface to the Client and Server, but my understanding of the code is that a single Client can multiplex remote procedure calls from multiple Caller threads (I think this is exhibited in TestRPC.java lines 277 through 284). So I think that we would need to associate headers with individual Call objects, rather than Client objects.

        If so, one way of doing that would be to create an IPCHeaders object that contains the headers you want to include with the IPC call, and then modify
        Client.call(Writable param, InetSocketAddress addr, UserGroupInformation ticket) to become
        Client.call(Writable param, InetSocketAddress addr, UserGroupInformation ticket, IPCHeaders headers)

        This way we could associate those headers with its Call on the Client side, and include them on the wire when we call into the server. In terms of the return path, instead of returning a Writable, it could return a pair consisting of a Writable and an IPCHeaders (which were set by the server).

        Show
        George Porter added a comment - Thank you for the feedback. I'm implementing it now, but wanted to bring up an issue related to making Client and Server more extensible. I like the idea of providing an addHeader() and setHeader() interface to the Client and Server, but my understanding of the code is that a single Client can multiplex remote procedure calls from multiple Caller threads (I think this is exhibited in TestRPC.java lines 277 through 284). So I think that we would need to associate headers with individual Call objects, rather than Client objects. If so, one way of doing that would be to create an IPCHeaders object that contains the headers you want to include with the IPC call, and then modify Client.call(Writable param, InetSocketAddress addr, UserGroupInformation ticket) to become Client.call(Writable param, InetSocketAddress addr, UserGroupInformation ticket, IPCHeaders headers) This way we could associate those headers with its Call on the Client side, and include them on the wire when we call into the server. In terms of the return path, instead of returning a Writable, it could return a pair consisting of a Writable and an IPCHeaders (which were set by the server).
        Hide
        Doug Cutting added a comment -

        > Doug, I guess you meant two spaces, no ?

        Oops. Yes, I was off by a factor of two.

        Show
        Doug Cutting added a comment - > Doug, I guess you meant two spaces, no ? Oops. Yes, I was off by a factor of two.
        Hide
        Hemanth Yamijala added a comment -

        indentation is 8 spaces in places. it should be four.

        Doug, I guess you meant two spaces, no ?

        Show
        Hemanth Yamijala added a comment - indentation is 8 spaces in places. it should be four. Doug, I guess you meant two spaces, no ?
        Hide
        Doug Cutting added a comment -

        A few comments on the patch:

        • indentation is 8 spaces in places. it should be four.
        • IPCInstrumentation needs javadoc on all public & protected elements
        • in RPC.java, instead of calling 'IPCInstrumentation.getInstrumentation()' everywhere, should we just call it once, saving the value in a field or variable, and then use that?
        • RPC is layered on Client and Server, yet the instrumentation code bleeds into both layers. It would be better if it were implemented entirely at one level or the other. I like the idea of adding extensible headers to Client and Server. Perhaps that could be added as a feature that RPC uses to implement instrumentation. So the additions to Client and Server might be something more generic like 'addHeader(int key, byte[] value)' and 'getHeader(int key, byte[] value)'.
        • you included an XTraceInstrumentation implementation in the patch, but that probably belongs in a different package, no?
        Show
        Doug Cutting added a comment - A few comments on the patch: indentation is 8 spaces in places. it should be four. IPCInstrumentation needs javadoc on all public & protected elements in RPC.java, instead of calling 'IPCInstrumentation.getInstrumentation()' everywhere, should we just call it once, saving the value in a field or variable, and then use that? RPC is layered on Client and Server, yet the instrumentation code bleeds into both layers. It would be better if it were implemented entirely at one level or the other. I like the idea of adding extensible headers to Client and Server. Perhaps that could be added as a feature that RPC uses to implement instrumentation. So the additions to Client and Server might be something more generic like 'addHeader(int key, byte[] value)' and 'getHeader(int key, byte[] value)'. you included an XTraceInstrumentation implementation in the patch, but that probably belongs in a different package, no?
        Hide
        Ari Rabkin added a comment -

        George: Looks good, and thanks so much for doing this. A few thoughts, if I may kibitz instead of coding:

        • You have raw byte[] arrays sprinkled around, e.g., in Server. Can we hide these behind abstract classes? Something like "RPCCallInstrumentationState".
        • Why do we need the thread-local stuff in IPCInstrumentation? Couldn't it be pushed down to the concrete XTraceIPCInstrumentation?
        • Hadoop's io libraries have these utility functions for serializing variable length ints (hadoop.io.WritableUtils.writeVInt and readVInt) . I think Owen is pushing using them for the serialized length field we send with RPC optional fields.
        Show
        Ari Rabkin added a comment - George: Looks good, and thanks so much for doing this. A few thoughts, if I may kibitz instead of coding: You have raw byte[] arrays sprinkled around, e.g., in Server. Can we hide these behind abstract classes? Something like "RPCCallInstrumentationState". Why do we need the thread-local stuff in IPCInstrumentation? Couldn't it be pushed down to the concrete XTraceIPCInstrumentation? Hadoop's io libraries have these utility functions for serializing variable length ints (hadoop.io.WritableUtils.writeVInt and readVInt) . I think Owen is pushing using them for the serialized length field we send with RPC optional fields.
        Hide
        George Porter added a comment -

        This patch is an implementation of the instrumentation API for the RPC layer.

        It includes an abstract instrumentation class for the RPC layer, as well as two concrete implementations: a "null" implementation that does nothing, and a "test" implementation that is used for unit testing. An X-Trace implementation of the API will be attached shortly.

        Each successful RPC call activates the following four instrumentation points in order:
        1) clientStartCall()
        2) serverReceiveCall()
        3) serverSendResponse()
        4) clientReceiveResponse()

        An instrumentation point can set "path state" using setPathState(). This state follows the RPC call and is available to the remaining instrumentation points via getPathState().

        There are also two instrumentation points for erroneous conditions: remoteException() is activated if the code running on the server throws an exception, and ipcFailure() is called if there is an underlying failure in the network causing RPC to fail.

        Show
        George Porter added a comment - This patch is an implementation of the instrumentation API for the RPC layer. It includes an abstract instrumentation class for the RPC layer, as well as two concrete implementations: a "null" implementation that does nothing, and a "test" implementation that is used for unit testing. An X-Trace implementation of the API will be attached shortly. Each successful RPC call activates the following four instrumentation points in order: 1) clientStartCall() 2) serverReceiveCall() 3) serverSendResponse() 4) clientReceiveResponse() An instrumentation point can set "path state" using setPathState(). This state follows the RPC call and is available to the remaining instrumentation points via getPathState(). There are also two instrumentation points for erroneous conditions: remoteException() is activated if the code running on the server throws an exception, and ipcFailure() is called if there is an underlying failure in the network causing RPC to fail.
        Hide
        George Porter added a comment - - edited

        The following are some notes on creating a pluggable path-based tracing framework. They are the result of conversations between Ari, Andy, George, Rodrigo, Owen, Mac, Arun, and others. This will hopefully serve as a starting point for further discussion.

        Path-based tracing consists of two different operations: propagation and instrumentation. Propagation is responsible for keeping "path state" needed to reconstruct the event graph flowing along the datapath. Path state must be maintained within a single thread or JVM, and communicated across network protocols. You can think of path state as a small set of bytes that follow a given operation such as a DFS write or an RPC call through each of the machines involved in that call. Instrumentation is responsible for creating events. These events can make use of the path state, and can also modify that path state. Instrumentation points are called at key places in the code, such as when an RPC client is about to invoke a call across the network, or when that call is received by the server.

        At first, there will be three abstract instrumentation classes:

        • HDFSInstrumentation
        • IPCInstrumentation
        • MapReduceInstrumentation

        Specific path-based tracing frameworks will create subclasses (i.e., XTraceIPCInstrumentation. There would be one abstract path state class, PathState. This state is very small, and consists of a type, a length, and a few bytes. Concrete subclasses can impose semantics on the bytes.

        A proposal for the IPC instrumentation abstract class is:

        public abstract class IPCInstrumentation {
            /** Called when the client initiates an RPC call with invocation 'i' */
            public abstract void clientStartCall(Invocation i);
        
            /** Called when the server receives the RPC call, before it begins processing */
            public abstract void serverReceiveCall();
        
            /** Called when the server has finished processing, and is about to return the result 'retvalue' */
            public abstract void serverSendResponse(Writable retvalue);
        
            /** Called when the client receives the response from the server */
            public abstract void clientReceiveResponse();
        
            /** Called when the RPC invocation 'i' throws an exception 't' on the server */
            public abstract void remoteException(Invocation i, Throwable t);
        
            /** Called when a failure occurs reaching the server (i.e., network failure)
              * 'i' is the invocation that failed, and 't' is the failure exception */
            public abstract void ipcInfrastructureFailure(Invocation i, Throwable t);
          }
        

        In terms of propagation, the IPCInstrumentation class will have a reference to a PathState object, and if that reference is non-null, will include it in the ipc protocol. The IPCInstrumentation methods can get, set, and modify that PathState object.

        Show
        George Porter added a comment - - edited The following are some notes on creating a pluggable path-based tracing framework. They are the result of conversations between Ari, Andy, George, Rodrigo, Owen, Mac, Arun, and others. This will hopefully serve as a starting point for further discussion. Path-based tracing consists of two different operations: propagation and instrumentation. Propagation is responsible for keeping "path state" needed to reconstruct the event graph flowing along the datapath. Path state must be maintained within a single thread or JVM, and communicated across network protocols. You can think of path state as a small set of bytes that follow a given operation such as a DFS write or an RPC call through each of the machines involved in that call. Instrumentation is responsible for creating events. These events can make use of the path state, and can also modify that path state. Instrumentation points are called at key places in the code, such as when an RPC client is about to invoke a call across the network, or when that call is received by the server. At first, there will be three abstract instrumentation classes: HDFSInstrumentation IPCInstrumentation MapReduceInstrumentation Specific path-based tracing frameworks will create subclasses (i.e., XTraceIPCInstrumentation. There would be one abstract path state class, PathState. This state is very small, and consists of a type, a length, and a few bytes. Concrete subclasses can impose semantics on the bytes. A proposal for the IPC instrumentation abstract class is: public abstract class IPCInstrumentation { /** Called when the client initiates an RPC call with invocation 'i' */ public abstract void clientStartCall(Invocation i); /** Called when the server receives the RPC call, before it begins processing */ public abstract void serverReceiveCall(); /** Called when the server has finished processing, and is about to return the result 'retvalue' */ public abstract void serverSendResponse(Writable retvalue); /** Called when the client receives the response from the server */ public abstract void clientReceiveResponse(); /** Called when the RPC invocation 'i' throws an exception 't' on the server */ public abstract void remoteException(Invocation i, Throwable t); /** Called when a failure occurs reaching the server (i.e., network failure) * 'i' is the invocation that failed, and 't' is the failure exception */ public abstract void ipcInfrastructureFailure(Invocation i, Throwable t); } In terms of propagation, the IPCInstrumentation class will have a reference to a PathState object, and if that reference is non-null, will include it in the ipc protocol. The IPCInstrumentation methods can get, set, and modify that PathState object.
        Hide
        Ari Rabkin added a comment -

        I spent a little while looking at this and exploring options a month or so ago.

        The main conclusion from that work is that it's worth solving this problem generally, and sharing work with other instrumentation tools. At the points where you'd like to emit causal log entries, there's other things you'd like to do, so probably the right way to do this is to have an abstract "Instrumentation" class, that can be subclassed to do useful things like send x-trace reports.

        There are already instrumentation classes for JobTracker and TaskTracker (via HADOOP-3772), though you may find it necessary to add more methods. Also, there was some concern about the performance impact of tracing in the IPC level, since it's on the critical path.

        Show
        Ari Rabkin added a comment - I spent a little while looking at this and exploring options a month or so ago. The main conclusion from that work is that it's worth solving this problem generally, and sharing work with other instrumentation tools. At the points where you'd like to emit causal log entries, there's other things you'd like to do, so probably the right way to do this is to have an abstract "Instrumentation" class, that can be subclassed to do useful things like send x-trace reports. There are already instrumentation classes for JobTracker and TaskTracker (via HADOOP-3772 ), though you may find it necessary to add more methods. Also, there was some concern about the performance impact of tracing in the IPC level, since it's on the critical path.
        Hide
        George Porter added a comment -

        An example trace of a multi-block write. The first block is written to worker3, who pipelines it to worker 1, who pipelines it to worker2. The second block is written to worker1, who pipelines it to worker3, who pipelines it to worker2.

        Show
        George Porter added a comment - An example trace of a multi-block write. The first block is written to worker3, who pipelines it to worker 1, who pipelines it to worker2. The second block is written to worker1, who pipelines it to worker3, who pipelines it to worker2.
        Hide
        George Porter added a comment -

        A trace of a multi-block read from the client to two datanodes (first worker3, then worker1)

        Show
        George Porter added a comment - A trace of a multi-block read from the client to two datanodes (first worker3, then worker1)

          People

          • Assignee:
            Unassigned
            Reporter:
            George Porter
          • Votes:
            2 Vote for this issue
            Watchers:
            30 Start watching this issue

            Dates

            • Created:
              Updated:

              Development