Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-2096

HDFS Not appending to a file, continuously rollng file.

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Not A Problem
    • 1.3.0
    • None
    • Sinks+Sources

    Description

      Hi guys,

      We are sending JSON events from our pipeline into a flume http source.
      We have written a custom multiplexer and sink serializer. The events are
      being routed into the correct channels and consumed OK by the sinks. The
      custom serializer takes a JSON event and outputs a csv. Files are being
      written to s3 ( using s3n as hdfs ) but rather than appending the
      written csv file, each event seems to be generating it own csv.

      The output is what I would expect using rollCount 1, however we do
      occasionally get several events ( maybe 4 ) written per csv. Please see
      below for config.

      Ideally we want to use rollInterval of 24 hours, to generate a new .csv
      file every 24 hours, but have events pretty quickly flushed to the csv
      file after being sent. So one csv' per day that is consistently appended
      with whatever events we throw in. We found however that with a
      rollInterval of 24 hours the events weren't being flushed often enough...

      Is this a bug??

      FYI - We have ammended the below config to include hdfs.rollCount = 0, hdfs.rollSize = 0 so those shouldn't be falling back to defaults and we still have the same issue...

      Thanks.

      Josh

        1. Sources ###################################################
          agent.sources = http
          agent.sources.http.type = http
          agent.sources.http.bind = 0.0.0.0
          agent.sources.http.port = 4444
          agent.sources.http.channels = cappucino_s3_aggregate_profile_channel
          default_s3_channel cappucino_s3_trip_summary_channel
        1. Interceptors #################################################
          agent.sources.http.interceptors = itime ihost
          agent.sources.http.interceptors.itime.type = timestamp
          agent.sources.http.interceptors.ihost.type = host
          agent.sources.http.interceptors.ihost.useIP = false
          agent.sources.http.interceptors.ihost.preserveExisting= false
          agent.sources.http.interceptors.ihost.hostHeader = hostname
        1. Multiplex Channels Mapping ######################################
          agent.sources.http.selector.type =
          com.mydrivesolutions.flume.serialization.PipelineEventsChannelSelector
          agent.sources.http.selector.default = default_s3_channel
        1. Channels ########################################################
          agent.channels = cappucino_s3_aggregate_profile_channel
          cappucino_s3_trip_summary_channel default_s3_channel

      agent.channels.cappucino_s3_aggregate_profile_channel.type = file
      agent.channels.cappucino_s3_aggregate_profile_channel.capacity = 10000000
      agent.channels.cappucino_s3_aggregate_profile_channel.checkpointDir =
      /mnt/flume/cappucino_s3_aggregate_profile/checkpoint
      agent.channels.cappucino_s3_aggregate_profile_channel.dataDirs =
      /mnt/flume/cappucino_s3_aggregate_profile/data

      agent.channels.cappucino_s3_trip_summary_channel.type = file
      agent.channels.cappucino_s3_trip_summary_channel.capacity = 10000000
      agent.channels.cappucino_s3_trip_summary_channel.checkpointDir =
      /mnt/flume/cappucino_s3_trip_summary/checkpoint
      agent.channels.cappucino_s3_trip_summary_channel.dataDirs =
      /mnt/flume/cappucino_s3_trip_summary/data

        1. Sinks ###########################################################
          agent.sinks = cappucino_s3_aggregate_profile_sink1
          cappucino_s3_aggregate_profile_sink2 cappucino_s3_trip_summary_sink1
          cappucino_s3_trip_summary_sink2
        1. Serialize json events from the pipeline and write csv to HDFS (We are
          using s3 native FS as HDFS)
          ###############################################################################
        2. Capuccino_s3_aggregate_profile Sinks
          #################################################
          agent.sinks.cappucino_s3_aggregate_profile_sink1.type = hdfs
          agent.sinks.cappucino_s3_aggregate_profile_sink1.channel =
          cappucino_s3_aggregate_profile_channel
          agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.path =
          s3n://mydrive-cappucino-reports/driver-profiles/%Y-%m-%d
          agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.fileType = DataStream
          agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.writeFormat = Text
          agent.sinks.cappucino_s3_aggregate_profile_sink1.serializer =
          com.mydrivesolutions.flume.serialization.HeaderAndBodyTextEventSerializer$Builder
          agent.sinks.cappucino_s3_aggregate_profile_sink1.serializer.columns =
          log_type reporting_bucket subscription_id
          agent.sinks.cappucino_s3_aggregate_profile_sink1.serializer.format =
          DriverProfile
          agent.sinks.cappucino_s3_aggregate_profile_sink1.serializer.delimiter = ,
          agent.sinks.cappucino_s3_aggregate_profile_sink1.serializer.appendNewline =
          false
          agent.sinks.cappucino_s3_aggregate_profile_sink1.serializer.distanceMeasure
          = MILES
          agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.maxOpenFiles = 5000
          agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.rollInterval = 20400
          agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.callTimeout = 60000
          agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.fileSuffix = .csv
          agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.inUseSuffix = .tmp
          agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.filePrefix =
          DriverProfile.%y-%m-%d.%H.%M
          agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.timeZone = UTC

      agent.sinks.cappucino_s3_aggregate_profile_sink2.type = hdfs
      agent.sinks.cappucino_s3_aggregate_profile_sink2.channel =
      cappucino_s3_aggregate_profile_channel
      agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.path =
      s3n://mydrive-cappucino-reports/driver-profiles/%Y-%m-%d
      agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.fileType = DataStream
      agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.writeFormat = Text
      agent.sinks.cappucino_s3_aggregate_profile_sink2.serializer =
      com.mydrivesolutions.flume.serialization.HeaderAndBodyTextEventSerializer$Builder
      agent.sinks.cappucino_s3_aggregate_profile_sink2.serializer.columns =
      log_type reporting_bucket subscription_id
      agent.sinks.cappucino_s3_aggregate_profile_sink2.serializer.format =
      DriverProfile
      agent.sinks.cappucino_s3_aggregate_profile_sink2.serializer.delimiter = ,
      agent.sinks.cappucino_s3_aggregate_profile_sink2.serializer.appendNewline =
      false
      agent.sinks.cappucino_s3_aggregate_profile_sink2.serializer.distanceMeasure
      = MILES
      agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.maxOpenFiles = 5000
      agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.rollInterval = 20400
      agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.callTimeout = 60000
      agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.fileSuffix = .csv
      agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.inUseSuffix = .tmp
      agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.filePrefix =
      DriverProfile.%y-%m-%d.%H.%M
      agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.timeZone = UTC

        1. Cappuccino_s3_trip_summary Sinks
          #################################################
          agent.sinks.cappucino_s3_trip_summary_sink1.type = hdfs
          agent.sinks.cappucino_s3_trip_summary_sink1.channel =
          cappucino_s3_trip_summary_channel
          agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.path =
          s3n://mydrive-cappucino-reports/trip-summaries/%Y-%m-%d
          agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.fileType = DataStream
          agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.writeFormat = Text
          agent.sinks.cappucino_s3_trip_summary_sink1.serializer =
          com.mydrivesolutions.flume.serialization.HeaderAndBodyTextEventSerializer$Builder
          agent.sinks.cappucino_s3_trip_summary_sink1.serializer.columns =
          log_type reporting_bucket subscription_id
          agent.sinks.cappucino_s3_trip_summary_sink1.serializer.format = BodyCSV
          agent.sinks.cappucino_s3_trip_summary_sink1.serializer.delimiter = ,
          agent.sinks.cappucino_s3_trip_summary_sink1.serializer.appendNewline = false
          agent.sinks.cappucino_s3_trip_summary_sink1.serializer.distanceMeasure =
          MILES
          agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.maxOpenFiles = 5000
          agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.rollInterval = 20400
          agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.callTimeout = 60000
          agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.fileSuffix = .csv
          agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.inUseSuffix = .tmp
          agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.filePrefix =
          TripSummary.%y-%m-%d.%H.%M
          agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.timeZone = UTC

      agent.sinks.cappucino_s3_trip_summary_sink2.type = hdfs
      agent.sinks.cappucino_s3_trip_summary_sink2.channel =
      cappucino_s3_trip_summary_channel
      agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.path =
      s3n://mydrive-cappucino-reports/trip-summaries/%Y-%m-%d
      agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.fileType = DataStream
      agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.writeFormat = Text
      agent.sinks.cappucino_s3_trip_summary_sink2.serializer =
      com.mydrivesolutions.flume.serialization.HeaderAndBodyTextEventSerializer$Builder
      agent.sinks.cappucino_s3_trip_summary_sink2.serializer.columns =
      log_type reporting_bucket subscription_id
      agent.sinks.cappucino_s3_trip_summary_sink2.serializer.format = BodyCSV
      agent.sinks.cappucino_s3_trip_summary_sink2.serializer.delimiter = ,
      agent.sinks.cappucino_s3_trip_summary_sink2.serializer.appendNewline = false
      agent.sinks.cappucino_s3_trip_summary_sink2.serializer.distanceMeasure =
      MILES
      agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.maxOpenFiles = 5000
      agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.rollInterval = 20400
      agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.callTimeout = 60000
      agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.fileSuffix = .csv
      agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.inUseSuffix = .tmp
      agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.filePrefix = TripSummary.%y-%m-%d.%H.%M
      agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.timeZone = UTC

        1. SinkGroups ###########################################################
          agent.sinkgroups = cappucino_s3_aggregate_profile_sinkgroup
          cappucino_s3_trip_summary_sinkgroup
        1. Cappuccino_s3_aggregate_profile Failover SinkGroup
          ##########################################
          agent.sinkgroups.cappucino_s3_aggregate_profile_sinkgroup.sinks =
          cappucino_s3_aggregate_profile_sink1 cappucino_s3_aggregate_profile_sink2
          agent.sinkgroups.cappucino_s3_aggregate_profile_sinkgroup.processor.type
          = failover
          agent.sinkgroups.cappucino_s3_aggregate_profile_sinkgroup.processor.priority.cappucino_s3_aggregate_profile_sink1 = 10
          agent.sinkgroups.cappucino_s3_aggregate_profile_sinkgroup.processor.priority.cappucino_s3_aggregate_profile_sink2 = 5
          agent.sinkgroups.cappucino_s3_aggregate_profile_sinkgroup.processor.maxpenalty = 30000
        1. Confused_s3_trip_summary Failover SinkGroup
          ##########################################
          agent.sinkgroups.cappucino_s3_trip_summary_sinkgroup.sinks =
          cappucino_s3_trip_summary_sink1 cappucino_s3_trip_summary_sink2
          agent.sinkgroups.cappucino_s3_trip_summary_sinkgroup.processor.type =
          failover
          agent.sinkgroups.cappucino_s3_trip_summary_sinkgroup.processor.priority.cappucino_s3_trip_summary_sink1

      = 10
      agent.sinkgroups.cappucino_s3_trip_summary_sinkgroup.processor.priority.cappucino_s3_trip_summary_sink2

      = 5
      agent.sinkgroups.cappucino_s3_trip_summary_sinkgroup.processor.maxpenalty =
      30000

      Attachments

        Activity

          People

            Unassigned Unassigned
            joshmyers Josh Myers
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: