Uploaded image for project: 'Apache Gobblin'
  1. Apache Gobblin
  2. GOBBLIN-88

Gobblin fails when pulling Kafka to HDFS

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • None

    Description

      I have some troubles in pulling Kafka data to HDFS using Gobblin.

      My environemnt is...

      • Hadoop 2.7.1 (vanilla version)
      • Gobblin 0.7.0 (build version)

      I built my own version of gobblin with -PhadoopVersion=2.7.1 option due to Hadoop version.
      (At first, I tried using release version, but it failed.)

      But, gobblin-mapreduce.sh fails with messages as follow.
      `2016-08-11 22:51:09 KST ERROR [main] gobblin.runtime.AbstractJobLauncher 321 - Failed to launch and run job job_Kafka2HDFS_1470923459905: java.io.FileNotFoundException: File /user/chkim/gobblin/Kafka2HDFS/job.state does not exist
      java.io.FileNotFoundException: File /user/chkim/gobblin/Kafka2HDFS/job.state does not exist`

      Log file contains...

      ```
      ...
      2016-08-11 22:51:08 KST INFO [TaskStateCollectorService STARTING] gobblin.runtime.TaskStateCollectorService 92 - Starting the TaskStateCollectorService
      2016-08-11 22:51:08 KST INFO [main] gobblin.runtime.mapreduce.MRJobLauncher 203 - Launching Hadoop MR job Gobblin-Kafka2HDFS
      2016-08-11 22:51:08 KST INFO [main] org.apache.hadoop.conf.Configuration 1173 - session.id is deprecated. Instead, use dfs.metrics.session-id
      2016-08-11 22:51:08 KST INFO [main] org.apache.hadoop.metrics.jvm.JvmMetrics 76 - Initializing JVM Metrics with processName=JobTracker, sessionId=
      2016-08-11 22:51:09 KST INFO [main] org.apache.hadoop.mapreduce.JobSubmitter 249 - Cleaning up the staging area file:/tmp/hadoop-chkim/mapred/staging/chkim82482447/.staging/job_local82482447_0001
      2016-08-11 22:51:09 KST INFO [TaskStateCollectorService STOPPING] gobblin.runtime.TaskStateCollectorService 98 - Stopping the TaskStateCollectorService
      2016-08-11 22:51:09 KST WARN [TaskStateCollectorService STOPPING] gobblin.runtime.TaskStateCollectorService 119 - Output task state path /user/chkim/gobblin/Kafka2HDFS/output/job_Kafka2HDFS_1470923459905 does not exist
      2016-08-11 22:51:09 KST INFO [main] gobblin.runtime.mapreduce.MRJobLauncher 478 - Deleted working directory /user/chkim/gobblin/Kafka2HDFS
      2016-08-11 22:51:09 KST ERROR [main] gobblin.runtime.AbstractJobLauncher 321 - Failed to launch and run job job_Kafka2HDFS_1470923459905: java.io.FileNotFoundException: File /user/chkim/gobblin/Kafka2HDFS/job.state does not exist
      java.io.FileNotFoundException: File /user/chkim/gobblin/Kafka2HDFS/job.state does not exist
      at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:606)
      at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:819)
      at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:596)
      at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
      at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.getFileStatus(ClientDistributedCacheManager.java:288)
      at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.getFileStatus(ClientDistributedCacheManager.java:224)
      at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.determineTimestamps(ClientDistributedCacheManager.java:93)
      at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(ClientDistributedCacheManager.java:57)
      at org.apache.hadoop.mapreduce.JobResourceUploader.uploadFiles(JobResourceUploader.java:179)
      at org.apache.hadoop.mapreduce.JobSubmitter.copyAndConfigureFiles(JobSubmitter.java:95)
      at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:190)
      at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)
      at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287)
      at java.security.AccessController.doPrivileged(Native Method)
      at javax.security.auth.Subject.doAs(Subject.java:415)
      at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
      at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287)
      at gobblin.runtime.mapreduce.MRJobLauncher.runWorkUnits(MRJobLauncher.java:204)
      at gobblin.runtime.AbstractJobLauncher.launchJob(AbstractJobLauncher.java:296)
      at gobblin.runtime.mapreduce.CliMRJobLauncher.launchJob(CliMRJobLauncher.java:84)
      at gobblin.runtime.mapreduce.CliMRJobLauncher.run(CliMRJobLauncher.java:61)
      at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
      at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
      at gobblin.runtime.mapreduce.CliMRJobLauncher.main(CliMRJobLauncher.java:106)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:606)
      at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
      at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
      2016-08-11 22:51:09 KST INFO [main] gobblin.util.ExecutorsUtils 125 - Attempting to shutdown ExecutorService: java.util.concurrent.ThreadPoolExecutor@1d1faa2d[Shutting down, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
      ...
      ```

      Job conf. file is...

      ```
      job.name=Kafka2HDFS
      job.group=Gobblin
      job.description=Gobblin
      job.lock.enable=false

      kafka.brokers=10.96.250.146:9092,10.96.250.147:9092,10.96.250.148:9092

      source.class=gobblin.source.extractor.extract.kafka.KafkaSimpleSource
      extract.namespace=gobblin.extract.kafka

      topic.whitelist=json_.*
      writer.builder.class=gobblin.writer.SimpleDataWriterBuilder
      writer.file.path.type=tablename
      writer.destination.type=HDFS
      writer.output.format=txt

      fs.uri=<my HDFS domain>
      writer.fs.uri=<my HDFS domain>
      state.store.fs.uri=<my HDFS domain>

      data.publisher.type=gobblin.publisher.BaseDataPublisher

      mr.job.max.mappers=1

      mr.job.root.dir=/user/chkim/gobblin/
      state.store.dir=/user/chkim/gobblin/state-store
      task.data.root.dir=/user/chkim/gobblin/jobs/task-data
      data.publisher.final.dir=/user/chkim/gobblin/job-output
      metrics.reporting.file.enabled=true
      metrics.log.dir=${env:GOBBLIN_WORK_DIR}/metrics
      metrics.reporting.file.suffix=txt

      bootstrap.with.offset=earliest

      ```

      In addition, I have suceeded in pseudo distributed mode hadoop with hdfs://localhost:9000.

      How can I fix it ?
      Please let me know if you have any hints.

      Github Url : https://github.com/linkedin/gobblin/issues/1193
      Github Reporter : chanhkim
      Github Created At : 2016-08-11T14:58:32Z
      Github Updated At : 2017-01-12T05:00:33Z

      Comments


      nowell-jana wrote on 2016-11-30T08:00:29Z : Did you find a fix for this? Running into it myself

      Github Url : https://github.com/linkedin/gobblin/issues/1193#issuecomment-263807734


      chavdar wrote on 2016-11-30T14:19:36Z : Hi,

      Can you share your log?

      Thanks.

      On Wed, Nov 30, 2016 at 12:00 AM, nowell-jana <notifications@github.com>
      wrote:

      > Did you find a fix for this? Running into it myself
      >
      > —
      > You are receiving this because you are subscribed to this thread.
      > Reply to this email directly, view it on GitHub
      > <https://github.com/linkedin/gobblin/issues/1193#issuecomment-263807734>,
      > or mute the thread
      > <https://github.com/notifications/unsubscribe-auth/AA4sG60RjPAWWSsw6nTr05BspCa11oIbks5rDS0dgaJpZM4JiNfP>
      > .
      >

      Github Url : https://github.com/linkedin/gobblin/issues/1193#issuecomment-263884407


      nowell-jana wrote on 2016-11-30T23:02:55Z : 2016-11-30 22:57:13 UTC ERROR [main] gobblin.runtime.AbstractJobLauncher 321 - Failed to launch and run job job_gobblin-counter_1480546570415: java.io.FileNotFoundException: File /var/gobblin-data/work/working/gobblin-counter/job_gobblin-counter_1480546570415/job.state does not exist
      java.io.FileNotFoundException: File /var/gobblin-data/work/working/gobblin-counter/job_gobblin-counter_1480546570415/job.state does not exist
      at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:534)
      at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:747)
      at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:524)
      at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409)
      at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.getFileStatus(ClientDistributedCacheManager.java:288)
      at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.getFileStatus(ClientDistributedCacheManager.java:224)
      at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.determineTimestamps(ClientDistributedCacheManager.java:93)
      at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(ClientDistributedCacheManager.java:57)
      at org.apache.hadoop.mapreduce.JobSubmitter.copyAndConfigureFiles(JobSubmitter.java:269)
      at org.apache.hadoop.mapreduce.JobSubmitter.copyAndConfigureFiles(JobSubmitter.java:390)
      at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:483)
      at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1296)
      at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1293)
      at java.security.AccessController.doPrivileged(Native Method)
      at javax.security.auth.Subject.doAs(Subject.java:415)
      at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
      at org.apache.hadoop.mapreduce.Job.submit(Job.java:1293)
      at gobblin.runtime.mapreduce.MRJobLauncher.runWorkUnits(MRJobLauncher.java:203)
      at gobblin.runtime.AbstractJobLauncher.launchJob(AbstractJobLauncher.java:296)
      at gobblin.runtime.mapreduce.CliMRJobLauncher.launchJob(CliMRJobLauncher.java:84)
      at gobblin.runtime.mapreduce.CliMRJobLauncher.run(CliMRJobLauncher.java:61)
      at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
      at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
      at gobblin.runtime.mapreduce.CliMRJobLauncher.main(CliMRJobLauncher.java:106)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:606)
      at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
      at org.apache.hadoop.util.RunJar.main(RunJar.java:136)

      Github Url : https://github.com/linkedin/gobblin/issues/1193#issuecomment-264024786


      nowell-jana wrote on 2016-11-30T23:05:12Z : It's interesting, I can run in mapreduce mode with hadoop in standalone fine, but attempting to pick up where I left off running in map reduce mode on EMR I'm getting this error. It's possible it's looking in HDFS on the EMR master now instead of on the filesystem of the machine driving it all- looking into that.

      Github Url : https://github.com/linkedin/gobblin/issues/1193#issuecomment-264025264


      nowell-jana wrote on 2016-12-01T04:35:29Z : Here is my mapreduce configuration:

      ```
      ###############################################################################

                                                1. Gobblin MapReduce configurations #######################
                                                  ###############################################################################
      1. Thread pool settings for the task executor
        taskexecutor.threadpool.size=2
        taskretry.threadpool.coresize=1
        taskretry.threadpool.maxsize=2
      1. File system URIs
        fs.uri=hdfs://<EMR Master Node IP>:8020
        writer.fs.uri=${fs.uri}
        state.store.fs.uri=file:///
      1. S3 config
        data.publisher.fs.uri=${env:S3_BUCKET_URL}
        fs.s3a.access.key=${env:S3_ACCESS_KEY}
        fs.s3a.private.key=${env:S3_PRIVATE_KEY}
      1. Writer related configuration properties
        writer.destination.type=HDFS
        writer.output.format=AVRO
        writer.staging.dir=${env:GOBBLIN_WORK_DIR}/task-staging
        writer.output.dir=${env:GOBBLIN_WORK_DIR}/task-output
      1. Data publisher related configuration properties
        data.publisher.type=gobblin.publisher.BaseDataPublisher
        data.publisher.final.dir=${env:GOBBLIN_WORK_DIR}/job-output
        data.publisher.replace.final.dir=false
      1. Directory where job/task state files are stored
        state.store.dir=${env:GOBBLIN_WORK_DIR}/state-store
      1. Directory where error files from the quality checkers are stored
        qualitychecker.row.err.file=${env:GOBBLIN_WORK_DIR}/err
      1. Directory where job locks are stored
        job.lock.dir=${env:GOBBLIN_WORK_DIR}/locks
      1. Directory where metrics log files are stored
        metrics.log.dir=${env:GOBBLIN_WORK_DIR}/metrics
      1. Interval of task state reporting in milliseconds
        task.status.reportintervalinms=5000
      1. MapReduce properties
        mr.job.root.dir=${env:GOBBLIN_WORK_DIR}/working
        ```

      Github Url : https://github.com/linkedin/gobblin/issues/1193#issuecomment-264076807


      nowell-jana wrote on 2016-12-01T04:39:20Z : And to give a clearer description of what I'm attempting: I'm trying to run in mapreduce mode (with `gobblin-mapreduce.sh`) by pointing from a separate machine to an EMR cluster.

      Github Url : https://github.com/linkedin/gobblin/issues/1193#issuecomment-264077206

      Attachments

        Activity

          People

            Unassigned Unassigned
            abti Abhishek Tiwari
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: