Chukwa
  1. Chukwa
  2. CHUKWA-4

Collectors don't finish writing .done datasink from last .chukwa datasink when stopped using bin/stop-collectors

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Minor Minor
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: Data Collection
    • Labels:
      None
    • Environment:

      I am running on our local cluster. This is a linux machine that I also run Hadoop cluster from.

      Description

      When I use start-collectors, it creates the datasink as expected, writes to it as per normal, i.e. writes to the .chukwa file, and roll overs work fine when it renames the .chukwa file to .done. However, when I use bin/stop-collectors to shut down the running collector it leaves a .chukwa file in the HDFS file system. Not sure if this is a valid sink or not, but I think that the collector should gracefully clean up the datasink and rename it .done before exiting.

      1. CHUKWA-4.2.patch
        14 kB
        Ahmed Fathalla
      2. ASF.LICENSE.NOT.GRANTED--CHUKWA-4.patch
        14 kB
        Ahmed Fathalla

        Activity

        Hide
        Ahmed Fathalla added a comment -

        I have replicated this issue on my local development machine. When stop-collectors.sh runs, the following is logged to the collector log

        2010-03-26 14:10:12,988 WARN Shutdown SeqFileWriter - cannot rename dataSink file:/chukwa/logs/201026140940728_ahmedlaptop_421d1571279a5d08887ffc.chukwa
        java.io.IOException: Filesystem closed
        at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:226)
        at org.apache.hadoop.hdfs.DFSClient.access$600(DFSClient.java:67)
        at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.closeInternal(DFSClient.java:3219)
        at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.close(DFSClient.java:3188)
        at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:61)
        at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:86)
        at org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter.close(SeqFileWriter.java:318)
        at org.apache.hadoop.chukwa.datacollection.writer.SocketTeeWriter.close(SocketTeeWriter.java:267)
        at org.apache.hadoop.chukwa.datacollection.writer.PipelineStageWriter.close(PipelineStageWriter.java:46)
        at org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector.destroy(ServletCollector.java:227)
        at org.mortbay.jetty.servlet.ServletHolder.destroyInstance(ServletHolder.java:315)
        at org.mortbay.jetty.servlet.ServletHolder.doStop(ServletHolder.java:286)
        at org.mortbay.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:64)
        at org.mortbay.jetty.servlet.ServletHandler.doStop(ServletHandler.java:170)
        at org.mortbay.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:64)
        at org.mortbay.jetty.handler.HandlerWrapper.doStop(HandlerWrapper.java:142)
        at org.mortbay.jetty.servlet.SessionHandler.doStop(SessionHandler.java:124)
        at org.mortbay.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:64)
        at org.mortbay.jetty.handler.HandlerWrapper.doStop(HandlerWrapper.java:142)
        at org.mortbay.jetty.handler.ContextHandler.doStop(ContextHandler.java:569)
        at org.mortbay.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:64)
        at org.mortbay.jetty.handler.HandlerWrapper.doStop(HandlerWrapper.java:142)
        at org.mortbay.jetty.Server.doStop(Server.java:281)
        at org.mortbay.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:64)
        at org.mortbay.jetty.Server$ShutdownHookThread.run(Server.java:559)

        The line that throws the exception is this.currentOutputStr.close(); in the close() method of SeqFileWriter.java

        try {
        lock.acquire();
        if
        if (this.currentOutputStr != null)

        { this.currentOutputStr.close(); }

        if(ENABLE_ROTATION_ON_CLOSE)
        if(bytesThisRotate > 0)
        fs.rename(currentPath, new Path(currentFileName + ".done"));
        else
        fs.delete(currentPath, false);

        } catch (Throwable e)

        { log.warn("cannot rename dataSink file:" + currentPath,e); }

        finally

        { lock.release(); }

        }

        It seems that the FileSystem is somehow closed before we attempt to close currentOutputStr. Does anyone have recommendations on how this can be resolved?

        Show
        Ahmed Fathalla added a comment - I have replicated this issue on my local development machine. When stop-collectors.sh runs, the following is logged to the collector log 2010-03-26 14:10:12,988 WARN Shutdown SeqFileWriter - cannot rename dataSink file:/chukwa/logs/201026140940728_ahmedlaptop_421d1571279a5d08887ffc.chukwa java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:226) at org.apache.hadoop.hdfs.DFSClient.access$600(DFSClient.java:67) at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.closeInternal(DFSClient.java:3219) at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.close(DFSClient.java:3188) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:61) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:86) at org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter.close(SeqFileWriter.java:318) at org.apache.hadoop.chukwa.datacollection.writer.SocketTeeWriter.close(SocketTeeWriter.java:267) at org.apache.hadoop.chukwa.datacollection.writer.PipelineStageWriter.close(PipelineStageWriter.java:46) at org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector.destroy(ServletCollector.java:227) at org.mortbay.jetty.servlet.ServletHolder.destroyInstance(ServletHolder.java:315) at org.mortbay.jetty.servlet.ServletHolder.doStop(ServletHolder.java:286) at org.mortbay.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:64) at org.mortbay.jetty.servlet.ServletHandler.doStop(ServletHandler.java:170) at org.mortbay.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:64) at org.mortbay.jetty.handler.HandlerWrapper.doStop(HandlerWrapper.java:142) at org.mortbay.jetty.servlet.SessionHandler.doStop(SessionHandler.java:124) at org.mortbay.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:64) at org.mortbay.jetty.handler.HandlerWrapper.doStop(HandlerWrapper.java:142) at org.mortbay.jetty.handler.ContextHandler.doStop(ContextHandler.java:569) at org.mortbay.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:64) at org.mortbay.jetty.handler.HandlerWrapper.doStop(HandlerWrapper.java:142) at org.mortbay.jetty.Server.doStop(Server.java:281) at org.mortbay.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:64) at org.mortbay.jetty.Server$ShutdownHookThread.run(Server.java:559) The line that throws the exception is this.currentOutputStr.close(); in the close() method of SeqFileWriter.java try { lock.acquire(); if if (this.currentOutputStr != null) { this.currentOutputStr.close(); } if(ENABLE_ROTATION_ON_CLOSE) if(bytesThisRotate > 0) fs.rename(currentPath, new Path(currentFileName + ".done")); else fs.delete(currentPath, false); } catch (Throwable e) { log.warn("cannot rename dataSink file:" + currentPath,e); } finally { lock.release(); } } It seems that the FileSystem is somehow closed before we attempt to close currentOutputStr. Does anyone have recommendations on how this can be resolved?
        Hide
        Jerome Boulon added a comment -

        This is because HDFS is closing the fileSystem on kill -term (shutdownHook).
        The workaround is to remove the HDFS shurdown hook and call it after the collector close but it's just a workaround.

        Show
        Jerome Boulon added a comment - This is because HDFS is closing the fileSystem on kill -term (shutdownHook). The workaround is to remove the HDFS shurdown hook and call it after the collector close but it's just a workaround.
        Hide
        James Seigel added a comment -

        It isn't always a valid sink file....I can vouch for that. I've seen it where i've renamed those to .done and then they can't be processed properly...a readFully or something fails.

        Cheers

        Show
        James Seigel added a comment - It isn't always a valid sink file....I can vouch for that. I've seen it where i've renamed those to .done and then they can't be processed properly...a readFully or something fails. Cheers
        Hide
        Jerome Boulon added a comment -

        I'm exploring a couple of options:

        1- use the local fileSystem instead of HDFS in the first place and it's working pretty well for me since my HDFS is S3 so I can not really write directly to it
        2- At startup time, any .chukwa files with last access time greater than 2 rotations period cannot be file that collectors are writing to. So we can
        2.1 Open the file and read it, if not then you have todo 2.2 so not sure if always doing 2.2 will not be a better option
        2.2 Open the file, Create a new SequenceFile and copy data from one to the other, then close the file and rename. To avoid being trapped to the same issue and case of Kill -9/Crash, we need to use another extension like .recover and at start time we can delete any .recover file.

        Also at this point, there's no a valid way to process invalid SequenceFile in hadoop so paying the price when we know that something could be wrong seems to be better that crashing a M/R with 300 input files and then having to parse them all ....

        Show
        Jerome Boulon added a comment - I'm exploring a couple of options: 1- use the local fileSystem instead of HDFS in the first place and it's working pretty well for me since my HDFS is S3 so I can not really write directly to it 2- At startup time, any .chukwa files with last access time greater than 2 rotations period cannot be file that collectors are writing to. So we can 2.1 Open the file and read it, if not then you have todo 2.2 so not sure if always doing 2.2 will not be a better option 2.2 Open the file, Create a new SequenceFile and copy data from one to the other, then close the file and rename. To avoid being trapped to the same issue and case of Kill -9/Crash, we need to use another extension like .recover and at start time we can delete any .recover file. Also at this point, there's no a valid way to process invalid SequenceFile in hadoop so paying the price when we know that something could be wrong seems to be better that crashing a M/R with 300 input files and then having to parse them all ....
        Hide
        Ahmed Fathalla added a comment -

        I like the idea of identifying incomplete chukwa files at startup, and copying them into valid SequenceFiles. I think it we can go ahead and implement 2.2 directly to ignore the overhead of 2.1.

        I don't quite understand the idea of the .recover file. How will this help us in case a Kill -9 happens when we are copying the incomplete chukwa file to the newly created SequenceFile?

        Show
        Ahmed Fathalla added a comment - I like the idea of identifying incomplete chukwa files at startup, and copying them into valid SequenceFiles. I think it we can go ahead and implement 2.2 directly to ignore the overhead of 2.1. I don't quite understand the idea of the .recover file. How will this help us in case a Kill -9 happens when we are copying the incomplete chukwa file to the newly created SequenceFile?
        Hide
        Jerome Boulon added a comment -

        if we are using the same prefix for the recovering part and if collector is killed while writing to the SeqFile then you will no longer be able to identify the initial SeqFile. You'll have 2 .chukwa file, one will be an unclosed File, the other will be the one you were recovering into.
        Having a specific .recover extension will solve that problem since you can ignored it at re-start time.

        Show
        Jerome Boulon added a comment - if we are using the same prefix for the recovering part and if collector is killed while writing to the SeqFile then you will no longer be able to identify the initial SeqFile. You'll have 2 .chukwa file, one will be an unclosed File, the other will be the one you were recovering into. Having a specific .recover extension will solve that problem since you can ignored it at re-start time.
        Hide
        Ahmed Fathalla added a comment -

        I am working on a patch to solve this bug, however i am unsure if we need to change code in CollectorStub or ServletCollector classes? Does anyone have any guidance about what should be done to implement changes suggested in previous comments?

        Show
        Ahmed Fathalla added a comment - I am working on a patch to solve this bug, however i am unsure if we need to change code in CollectorStub or ServletCollector classes? Does anyone have any guidance about what should be done to implement changes suggested in previous comments?
        Hide
        Jerome Boulon added a comment -

        You could create an helper class responsible for doing the recovery then you can call this helper class from the LocalToRemoteHDFSMover class.
        But for the SeqFileWriter, it will be more difficult since your file will be visible by all collectors on HDFS so you would need to restrict the list of files per collector. So a better option for HDFS could be to have another deamon responsible for doing the recovery (wakeup every xx minutes and do a ls *.chukwa, check for last access time and recover and there's something to process).

        Show
        Jerome Boulon added a comment - You could create an helper class responsible for doing the recovery then you can call this helper class from the LocalToRemoteHDFSMover class. But for the SeqFileWriter, it will be more difficult since your file will be visible by all collectors on HDFS so you would need to restrict the list of files per collector. So a better option for HDFS could be to have another deamon responsible for doing the recovery (wakeup every xx minutes and do a ls *.chukwa, check for last access time and recover and there's something to process).
        Hide
        Ahmed Fathalla added a comment -

        I am working on a helper class that could be called before the moveFile method in LocalToRemoteHDFSMover, the purpose of this helper would be to copy the contents of the (possibly corrupt) .chukwa file into a valid sink file. Here is the class I created

        public class CopySequenceFile {
        static Logger log = Logger.getLogger(LocalWriter.class);
        private static SequenceFile.Writer seqFileWriter = null;
        private static SequenceFile.Reader seqFileReader = null;
        private static FSDataOutputStream newOutputStr = null;

        public static void main(String args[]){

        }

        public static void createValidSequenceFile(Configuration conf, String originalFileDir, String originalFileName,FileSystem localFs){
        try{
        String originalCompleteDir= originalFileDir + originalFileName;
        Path originalPath= new Path (originalCompleteDir);
        int extensionIndex= originalFileName.indexOf(".chukwa",0);
        String newFileName=originalFileName.substring(0, extensionIndex)+".recover";
        String newCompleteDir= originalFileDir+ newFileName;
        Path newPath= new Path (newCompleteDir);

        newOutputStr = localFs.create(newPath);
        seqFileWriter = SequenceFile.createWriter(conf, newOutputStr,
        ChukwaArchiveKey.class, ChunkImpl.class,
        SequenceFile.CompressionType.NONE, null);
        seqFileReader = new SequenceFile.Reader (localFs, originalPath, conf);

        System.out.println("key class name is " + seqFileReader.getKeyClassName());
        System.out.println("value class name is " + seqFileReader.getValueClassName());
        ChukwaArchiveKey key = new ChukwaArchiveKey();
        ChunkImpl evt = ChunkImpl.getBlankChunk();

        while (seqFileReader.next(key, evt))

        { seqFileWriter.append(key, evt); }

        //Remove original .chukwa file
        localFs.delete(originalPath,false);
        //Rename the file from .recover back to .chukwa
        localFs.rename(originalPath, newPath);
        seqFileReader.close();
        seqFileWriter.close();
        newOutputStr.close();
        }

        catch(Exception e)

        { log.warn("Error while copying .chukwa file to valid sink file",e); e.printStackTrace(); }


        }
        }

        However when the createValidSequenceFile method is called a ChecksumException is thrown when seqFileReader.next(key, evt) is being executed. (BTW this is the same exception that was causing moveFile to fail in LocalToRemoteHDFSMover, it seems to happen when the reader reaches the corrupt chunk).

        My question is : Is there a way to identify that the chunk being read is corrupt and if so, not to read it, and close the sink file being copied to?

        Show
        Ahmed Fathalla added a comment - I am working on a helper class that could be called before the moveFile method in LocalToRemoteHDFSMover, the purpose of this helper would be to copy the contents of the (possibly corrupt) .chukwa file into a valid sink file. Here is the class I created public class CopySequenceFile { static Logger log = Logger.getLogger(LocalWriter.class); private static SequenceFile.Writer seqFileWriter = null; private static SequenceFile.Reader seqFileReader = null; private static FSDataOutputStream newOutputStr = null; public static void main(String args[]){ } public static void createValidSequenceFile(Configuration conf, String originalFileDir, String originalFileName,FileSystem localFs){ try{ String originalCompleteDir= originalFileDir + originalFileName; Path originalPath= new Path (originalCompleteDir); int extensionIndex= originalFileName.indexOf(".chukwa",0); String newFileName=originalFileName.substring(0, extensionIndex)+".recover"; String newCompleteDir= originalFileDir+ newFileName; Path newPath= new Path (newCompleteDir); newOutputStr = localFs.create(newPath); seqFileWriter = SequenceFile.createWriter(conf, newOutputStr, ChukwaArchiveKey.class, ChunkImpl.class, SequenceFile.CompressionType.NONE, null); seqFileReader = new SequenceFile.Reader (localFs, originalPath, conf); System.out.println("key class name is " + seqFileReader.getKeyClassName()); System.out.println("value class name is " + seqFileReader.getValueClassName()); ChukwaArchiveKey key = new ChukwaArchiveKey(); ChunkImpl evt = ChunkImpl.getBlankChunk(); while (seqFileReader.next(key, evt)) { seqFileWriter.append(key, evt); } //Remove original .chukwa file localFs.delete(originalPath,false); //Rename the file from .recover back to .chukwa localFs.rename(originalPath, newPath); seqFileReader.close(); seqFileWriter.close(); newOutputStr.close(); } catch(Exception e) { log.warn("Error while copying .chukwa file to valid sink file",e); e.printStackTrace(); } } } However when the createValidSequenceFile method is called a ChecksumException is thrown when seqFileReader.next(key, evt) is being executed. (BTW this is the same exception that was causing moveFile to fail in LocalToRemoteHDFSMover, it seems to happen when the reader reaches the corrupt chunk). My question is : Is there a way to identify that the chunk being read is corrupt and if so, not to read it, and close the sink file being copied to?
        Hide
        Ari Rabkin added a comment -

        I don't think you need to identify the bad chunk. Why not just move the "close new file and rename things" logic into the exception handler? I would split the handler into "ChecksumException" – where you recover by closing and renaming the file – and everything else, which you should log the way you do now.

        Show
        Ari Rabkin added a comment - I don't think you need to identify the bad chunk. Why not just move the "close new file and rename things" logic into the exception handler? I would split the handler into "ChecksumException" – where you recover by closing and renaming the file – and everything else, which you should log the way you do now.
        Hide
        Jerome Boulon added a comment -

        the exception will be on this line "while (seqFileReader.next(key, evt))

        { seqFileWriter.append(key, evt); }

        "
        so you can add a 1st try/catch block around this line, then add another one around rename/delete.
        Also, don't delete the .chukwa file in the first place.

        Steps:
        1- Rename .recover to .recoverDone
        2- delete .chukwa
        3- rename .recoverDone to .done

        At startup time, you should:
        1- delete any .recover files
        2.0- if you have a .recoverDone and .chukwa with the same name, then delete the .chukwa file (Repeat until there's no more .chukwa file that matches this rule)
        2.1- Rename all .recoverDone to .done
        3- process .chukwa files
        4- send .done files

        Show
        Jerome Boulon added a comment - the exception will be on this line "while (seqFileReader.next(key, evt)) { seqFileWriter.append(key, evt); } " so you can add a 1st try/catch block around this line, then add another one around rename/delete. Also, don't delete the .chukwa file in the first place. Steps: 1- Rename .recover to .recoverDone 2- delete .chukwa 3- rename .recoverDone to .done At startup time, you should: 1- delete any .recover files 2.0- if you have a .recoverDone and .chukwa with the same name, then delete the .chukwa file (Repeat until there's no more .chukwa file that matches this rule) 2.1- Rename all .recoverDone to .done 3- process .chukwa files 4- send .done files
        Hide
        Ahmed Fathalla added a comment -

        I made the changes Jerome recommended, I tried it and it seems to be working correctly. Please take a look and tell me any comments you might have

        public class CopySequenceFile {
        static Logger log = Logger.getLogger(LocalWriter.class);
        private static SequenceFile.Writer seqFileWriter = null;
        private static SequenceFile.Reader seqFileReader = null;
        private static FSDataOutputStream newOutputStr = null;

        public static void main(String args[]){

        }

        public static void createValidSequenceFile(Configuration conf, String originalFileDir, String originalFileName,FileSystem localFs){
        try{
        String originalCompleteDir= originalFileDir + originalFileName;
        Path originalPath= new Path (originalCompleteDir);
        int extensionIndex= originalFileName.indexOf(".chukwa",0);
        String recoverDoneFileName=originalFileName.substring(0, extensionIndex)+".recoverDone";
        String recoverDoneDir= originalFileDir + recoverDoneFileName;
        Path recoverDonePath= new Path(recoverDoneDir);
        String recoverFileName=originalFileName.substring(0, extensionIndex)+".recover";
        String recoverDir= originalFileDir+ recoverFileName;
        Path recoverPath= new Path (recoverDir);
        String doneFileName=originalFileName.substring(0, extensionIndex)+".done";
        String doneDir= originalFileDir+ doneFileName;
        Path donePath= new Path (doneDir);

        newOutputStr = localFs.create(recoverPath);
        seqFileWriter = SequenceFile.createWriter(conf, newOutputStr,
        ChukwaArchiveKey.class, ChunkImpl.class,
        SequenceFile.CompressionType.NONE, null);
        seqFileReader = new SequenceFile.Reader (localFs, originalPath, conf);

        System.out.println("key class name is " + seqFileReader.getKeyClassName());
        System.out.println("value class name is " + seqFileReader.getValueClassName());
        ChukwaArchiveKey key = new ChukwaArchiveKey();
        ChunkImpl evt = ChunkImpl.getBlankChunk();
        try{
        while (seqFileReader.next(key, evt))

        { seqFileWriter.append(key, evt); }

        }
        catch (ChecksumException e)

        { //The exception occurs when we read a bad chunk while copying log.warn("Encountered Bad Chunk while copying .chukwa file, continuing",e); }

        try

        { localFs.rename(recoverPath, recoverDonePath); //Rename the destination file from .recover to .recoverDone localFs.delete(originalPath,false); //Delete Original .chukwa file localFs.rename(recoverDonePath, donePath); //rename .recoverDone to .done }

        catch (Exception e)

        { log.warn("Error occured while renaming .recoverDone to .recover or deleting .chukwa",e); e.printStackTrace(); }

        seqFileReader.close();
        seqFileWriter.close();
        newOutputStr.close();
        }

        catch(Exception e)

        { log.warn("Error during .chukwa file recovery",e); e.printStackTrace(); }


        }
        }

        Show
        Ahmed Fathalla added a comment - I made the changes Jerome recommended, I tried it and it seems to be working correctly. Please take a look and tell me any comments you might have public class CopySequenceFile { static Logger log = Logger.getLogger(LocalWriter.class); private static SequenceFile.Writer seqFileWriter = null; private static SequenceFile.Reader seqFileReader = null; private static FSDataOutputStream newOutputStr = null; public static void main(String args[]){ } public static void createValidSequenceFile(Configuration conf, String originalFileDir, String originalFileName,FileSystem localFs){ try{ String originalCompleteDir= originalFileDir + originalFileName; Path originalPath= new Path (originalCompleteDir); int extensionIndex= originalFileName.indexOf(".chukwa",0); String recoverDoneFileName=originalFileName.substring(0, extensionIndex)+".recoverDone"; String recoverDoneDir= originalFileDir + recoverDoneFileName; Path recoverDonePath= new Path(recoverDoneDir); String recoverFileName=originalFileName.substring(0, extensionIndex)+".recover"; String recoverDir= originalFileDir+ recoverFileName; Path recoverPath= new Path (recoverDir); String doneFileName=originalFileName.substring(0, extensionIndex)+".done"; String doneDir= originalFileDir+ doneFileName; Path donePath= new Path (doneDir); newOutputStr = localFs.create(recoverPath); seqFileWriter = SequenceFile.createWriter(conf, newOutputStr, ChukwaArchiveKey.class, ChunkImpl.class, SequenceFile.CompressionType.NONE, null); seqFileReader = new SequenceFile.Reader (localFs, originalPath, conf); System.out.println("key class name is " + seqFileReader.getKeyClassName()); System.out.println("value class name is " + seqFileReader.getValueClassName()); ChukwaArchiveKey key = new ChukwaArchiveKey(); ChunkImpl evt = ChunkImpl.getBlankChunk(); try{ while (seqFileReader.next(key, evt)) { seqFileWriter.append(key, evt); } } catch (ChecksumException e) { //The exception occurs when we read a bad chunk while copying log.warn("Encountered Bad Chunk while copying .chukwa file, continuing",e); } try { localFs.rename(recoverPath, recoverDonePath); //Rename the destination file from .recover to .recoverDone localFs.delete(originalPath,false); //Delete Original .chukwa file localFs.rename(recoverDonePath, donePath); //rename .recoverDone to .done } catch (Exception e) { log.warn("Error occured while renaming .recoverDone to .recover or deleting .chukwa",e); e.printStackTrace(); } seqFileReader.close(); seqFileWriter.close(); newOutputStr.close(); } catch(Exception e) { log.warn("Error during .chukwa file recovery",e); e.printStackTrace(); } } }
        Hide
        Ari Rabkin added a comment -

        Ahmed, looks promising.

        Can you generate a .patch file and attach it to this JIRA? Also, can you produce a unit test to make sure this functionality doesn't get inadvertently broken by some other change, in the future?

        Show
        Ari Rabkin added a comment - Ahmed, looks promising. Can you generate a .patch file and attach it to this JIRA? Also, can you produce a unit test to make sure this functionality doesn't get inadvertently broken by some other change, in the future?
        Hide
        Ahmed Fathalla added a comment -

        This patch contains a fix for corrupt sink files created locally. I've created a new class CopySequenceFile which copies the corrupt .chukwa file to a valid .done file.

        The code for recovering a failed copy attempt is included in the cleanup() method of LocalToRemoteHdfsMover and follows Jerome's suggestions. I have also created a unit test that creates a sink file, converts it into a .done file and validates that the .done file was created and the .chukwa file removed.

        I have tested this solution several times and it seems to be working. However, I have faced a rare case where recovery fails because I get the following exception while reading from the .chukwa file / writing to the .done file

        2010-04-12 07:56:47,538 WARN LocalToRemoteHdfsMover CopySequenceFile - Error during .chukwa file recovery
        java.io.EOFException
        at java.io.DataInputStream.readFully(DataInputStream.java:180)
        at org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63)
        at org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101)
        at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:1930)
        at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:1830)
        at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:1876)
        at org.apache.hadoop.chukwa.util.CopySequenceFile.createValidSequenceFile(CopySequenceFile.java:80)
        at org.apache.hadoop.chukwa.datacollection.writer.localfs.LocalToRemoteHdfsMover.cleanup(LocalToRemoteHdfsMover.java:185)
        at org.apache.hadoop.chukwa.datacollection.writer.localfs.LocalToRemoteHdfsMover.run(LocalToRemoteHdfsMover.java:215)

        This seemed to happen when recovering from a .chukwa file that was just created before the collector crashed (the .chukwa file size was about ~200KB) so I guess it might be that the file has no actual data and should be removed. I would appreciate it if you can point out how we can deal with this situation.

        Show
        Ahmed Fathalla added a comment - This patch contains a fix for corrupt sink files created locally. I've created a new class CopySequenceFile which copies the corrupt .chukwa file to a valid .done file. The code for recovering a failed copy attempt is included in the cleanup() method of LocalToRemoteHdfsMover and follows Jerome's suggestions. I have also created a unit test that creates a sink file, converts it into a .done file and validates that the .done file was created and the .chukwa file removed. I have tested this solution several times and it seems to be working. However, I have faced a rare case where recovery fails because I get the following exception while reading from the .chukwa file / writing to the .done file 2010-04-12 07:56:47,538 WARN LocalToRemoteHdfsMover CopySequenceFile - Error during .chukwa file recovery java.io.EOFException at java.io.DataInputStream.readFully(DataInputStream.java:180) at org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63) at org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101) at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:1930) at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:1830) at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:1876) at org.apache.hadoop.chukwa.util.CopySequenceFile.createValidSequenceFile(CopySequenceFile.java:80) at org.apache.hadoop.chukwa.datacollection.writer.localfs.LocalToRemoteHdfsMover.cleanup(LocalToRemoteHdfsMover.java:185) at org.apache.hadoop.chukwa.datacollection.writer.localfs.LocalToRemoteHdfsMover.run(LocalToRemoteHdfsMover.java:215) This seemed to happen when recovering from a .chukwa file that was just created before the collector crashed (the .chukwa file size was about ~200KB) so I guess it might be that the file has no actual data and should be removed. I would appreciate it if you can point out how we can deal with this situation.
        Hide
        Ahmed Fathalla added a comment -

        Did anyone have the time to look at this patch?

        Show
        Ahmed Fathalla added a comment - Did anyone have the time to look at this patch?
        Hide
        Jerome Boulon added a comment -

        Could you move:

        • seqFileReader.close();
        • seqFileWriter.close();
        • newOutputStr.close();
          before trying to rename/delete, just to avoid any late exception.

        Do we want to move any invalid .chukwa file to an InErrorDirectory, so we can malually inspect the file later?

        Did you test with an empty seqFile? I had to put a specific test for this so it may be good to catch everything here.

        other than that the patch looks good. I'll give a try as soon as I can.
        Thanks!

        Show
        Jerome Boulon added a comment - Could you move: seqFileReader.close(); seqFileWriter.close(); newOutputStr.close(); before trying to rename/delete, just to avoid any late exception. Do we want to move any invalid .chukwa file to an InErrorDirectory, so we can malually inspect the file later? Did you test with an empty seqFile? I had to put a specific test for this so it may be good to catch everything here. other than that the patch looks good. I'll give a try as soon as I can. Thanks!
        Hide
        Ahmed Fathalla added a comment -

        I've uploaded a new patch where I have moved the close() methods as Jerome advised. I've also moved the cleanup code that was in the unit test into a separate tearDown() method.

        I've tested my code with an empty .chukwa file and it correctly converts it to .done.

        Regarding creating an InErrorDirectory, do we actually need that or would it be an unnecessary complication? This also depends on how you would define an invalid .chukwa file?

        Show
        Ahmed Fathalla added a comment - I've uploaded a new patch where I have moved the close() methods as Jerome advised. I've also moved the cleanup code that was in the unit test into a separate tearDown() method. I've tested my code with an empty .chukwa file and it correctly converts it to .done. Regarding creating an InErrorDirectory, do we actually need that or would it be an unnecessary complication? This also depends on how you would define an invalid .chukwa file?
        Hide
        Ahmed Fathalla added a comment -

        Changing the status of the JIRA to patch available.

        Show
        Ahmed Fathalla added a comment - Changing the status of the JIRA to patch available.
        Hide
        Ari Rabkin added a comment -

        OK. Finally had a chance to review this. Looks good, and sorry for the delay – crazy busy at my end.

        • Can you say a bit about how you tested it?
        • I also don't quite understand when it gets invoked. Just from the localToHDFSMover? That would mean that the default configuration of "writers writing directly to HDFS" won't be affected. That's OK for now, but we should open a new issue to ultimately remove that limitation.

        --Ari

        Show
        Ari Rabkin added a comment - OK. Finally had a chance to review this. Looks good, and sorry for the delay – crazy busy at my end. Can you say a bit about how you tested it? I also don't quite understand when it gets invoked. Just from the localToHDFSMover? That would mean that the default configuration of "writers writing directly to HDFS" won't be affected. That's OK for now, but we should open a new issue to ultimately remove that limitation. --Ari
        Hide
        Ahmed Fathalla added a comment -

        Hey, Ari. Thanks for reviewing the patch.

        I tested it manually by stopping the collector while writing .chukwa files and starting it again,this leaves .chukwa files in the local corrector directory. When the collector is restarted, the CopySequenceFile class I implemented correctly copies the contents to to a valid sequence file and renames it to .done (except in the rare case I noted in previous comments).

        I also implemented a unit test "TestCopySequenceFile" which creates a .chukwa file and converts it into a valid .done file using CopySequenceFile.

        Yes, this implementation only covers the case of having a local collector directory. I suggest we address the issue of writing directly to HDFS in a separate JIRA.

        Show
        Ahmed Fathalla added a comment - Hey, Ari. Thanks for reviewing the patch. I tested it manually by stopping the collector while writing .chukwa files and starting it again,this leaves .chukwa files in the local corrector directory. When the collector is restarted, the CopySequenceFile class I implemented correctly copies the contents to to a valid sequence file and renames it to .done (except in the rare case I noted in previous comments). I also implemented a unit test "TestCopySequenceFile" which creates a .chukwa file and converts it into a valid .done file using CopySequenceFile. Yes, this implementation only covers the case of having a local collector directory. I suggest we address the issue of writing directly to HDFS in a separate JIRA.
        Hide
        Ari Rabkin added a comment -

        OK. Good. +1 to commit and will do so within the next few days barring objection.

        Show
        Ari Rabkin added a comment - OK. Good. +1 to commit and will do so within the next few days barring objection.
        Hide
        Jerome Boulon added a comment -

        Issue with HDFS writers is more complecated thant it looks like since everybody can see the same file at any given time. So extra steps need to be done to avoid duplicating data in a distributed env and therefore this needs to be done in a separate Jira.
        /Jerome.

        Show
        Jerome Boulon added a comment - Issue with HDFS writers is more complecated thant it looks like since everybody can see the same file at any given time. So extra steps need to be done to avoid duplicating data in a distributed env and therefore this needs to be done in a separate Jira. /Jerome.
        Hide
        Ari Rabkin added a comment -

        I just committed this. Thanks, Ahmed!!

        Show
        Ari Rabkin added a comment - I just committed this. Thanks, Ahmed!!

          People

          • Assignee:
            Ahmed Fathalla
            Reporter:
            Andy Konwinski
          • Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development