Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java (revision 1484901) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java (working copy) @@ -20,6 +20,7 @@ import junit.framework.Assert; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; @@ -28,6 +29,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl; import org.junit.Test; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.InputStream; + public class TestRecordFactory { @Test @@ -53,4 +58,17 @@ } } + @Test + public void testSerDeser() throws Exception { + RecordFactory factory = RecordFactoryPBImpl.get(); + ApplicationId id1 = factory.newRecordInstance(ApplicationId.class); + id1.setId(12345); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + factory.write(id1, baos); + baos.close(); + InputStream is = new ByteArrayInputStream(baos.toByteArray()); + ApplicationId id2 = factory.read(is, ApplicationId.class); + Assert.assertEquals(id1, id2); + } + } Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/RecordFactoryPBImpl.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/RecordFactoryPBImpl.java (revision 1484901) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/RecordFactoryPBImpl.java (working copy) @@ -18,11 +18,16 @@ package org.apache.hadoop.yarn.factories.impl.pb; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import com.google.protobuf.Message; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -76,6 +81,38 @@ } } + @SuppressWarnings("unchecked") + private Message getProto(T record) throws IOException { + try { + Method method = record.getClass().getMethod("getProto"); + return (Message) method.invoke(record); + } catch (Exception ex) { + throw new IOException(ex); + } + } + + @Override + public void write(T record, OutputStream output) throws IOException { + Message message = getProto(record); + message.writeTo(output); + } + + @Override + @SuppressWarnings("unchecked") + public T read(InputStream input, Class clazz) throws IOException { + try { + Message message = getProto(newRecordInstance(clazz)); + Message.Builder builder = message.newBuilderForType(); + Message proto = builder.mergeFrom(input).build(); + Class protoClass = proto.getClass(); + Class pbImplClass = localConf.getClassByName(getPBImplClassName(clazz)); + Constructor recordConstructor = pbImplClass.getConstructor(protoClass); + return (T) recordConstructor.newInstance(proto); + } catch (Exception ex) { + throw new IOException(ex); + } + } + private String getPBImplClassName(Class clazz) { String srcPackagePart = getPackageName(clazz); String srcClassName = getClassName(clazz); Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/RecordFactory.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/RecordFactory.java (revision 1484901) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/RecordFactory.java (working copy) @@ -20,7 +20,15 @@ import org.apache.hadoop.yarn.YarnException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; public interface RecordFactory { + public T newRecordInstance(Class clazz) throws YarnException; + + public void write(T record, OutputStream output) throws IOException; + + public T read(InputStream input, Class clazz) throws IOException; }