CollectorSink doesn't properly pass the format parameter down to the EscapedCustomDfs sink.
For example, this is working fine:
collectorSource(54001) | escapedCustomDfs("hdfs://hadoop1-m1:8020/", "test", seqfile("SnappyCodec") );
However, this is using the codec defined in flume-conf.xml
collectorSource(54001) | collectorSink("hdfs://hadoop1-m1:8020/", "test-", 600000, seqfile("SnappyCodec") );
By itself this bug would not be very serious, however the problem is that escapedCustomDfs/customDfs are using the same compressor, and they apply it on the whole file, in addition to the compression done natively by the sequence file - this makes the sequence file double compressed and invalid.
As far as I can tell, the only way to get a valid compressed sequence file is by setting flume.collector.dfs.compress.codec to "None" in flume-site.xml and use the format parameter to specify which compression to use for the sequence file, except that doesn't work...