Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
None
-
None
-
None
Description
I have some troubles in pulling Kafka data to HDFS using Gobblin.
My environemnt is...
- Hadoop 2.7.1 (vanilla version)
- Gobblin 0.7.0 (build version)
I built my own version of gobblin with -PhadoopVersion=2.7.1 option due to Hadoop version.
(At first, I tried using release version, but it failed.)
But, gobblin-mapreduce.sh fails with messages as follow.
`2016-08-11 22:51:09 KST ERROR [main] gobblin.runtime.AbstractJobLauncher 321 - Failed to launch and run job job_Kafka2HDFS_1470923459905: java.io.FileNotFoundException: File /user/chkim/gobblin/Kafka2HDFS/job.state does not exist
java.io.FileNotFoundException: File /user/chkim/gobblin/Kafka2HDFS/job.state does not exist`
Log file contains...
```
...
2016-08-11 22:51:08 KST INFO [TaskStateCollectorService STARTING] gobblin.runtime.TaskStateCollectorService 92 - Starting the TaskStateCollectorService
2016-08-11 22:51:08 KST INFO [main] gobblin.runtime.mapreduce.MRJobLauncher 203 - Launching Hadoop MR job Gobblin-Kafka2HDFS
2016-08-11 22:51:08 KST INFO [main] org.apache.hadoop.conf.Configuration 1173 - session.id is deprecated. Instead, use dfs.metrics.session-id
2016-08-11 22:51:08 KST INFO [main] org.apache.hadoop.metrics.jvm.JvmMetrics 76 - Initializing JVM Metrics with processName=JobTracker, sessionId=
2016-08-11 22:51:09 KST INFO [main] org.apache.hadoop.mapreduce.JobSubmitter 249 - Cleaning up the staging area file:/tmp/hadoop-chkim/mapred/staging/chkim82482447/.staging/job_local82482447_0001
2016-08-11 22:51:09 KST INFO [TaskStateCollectorService STOPPING] gobblin.runtime.TaskStateCollectorService 98 - Stopping the TaskStateCollectorService
2016-08-11 22:51:09 KST WARN [TaskStateCollectorService STOPPING] gobblin.runtime.TaskStateCollectorService 119 - Output task state path /user/chkim/gobblin/Kafka2HDFS/output/job_Kafka2HDFS_1470923459905 does not exist
2016-08-11 22:51:09 KST INFO [main] gobblin.runtime.mapreduce.MRJobLauncher 478 - Deleted working directory /user/chkim/gobblin/Kafka2HDFS
2016-08-11 22:51:09 KST ERROR [main] gobblin.runtime.AbstractJobLauncher 321 - Failed to launch and run job job_Kafka2HDFS_1470923459905: java.io.FileNotFoundException: File /user/chkim/gobblin/Kafka2HDFS/job.state does not exist
java.io.FileNotFoundException: File /user/chkim/gobblin/Kafka2HDFS/job.state does not exist
at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:606)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:819)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:596)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.getFileStatus(ClientDistributedCacheManager.java:288)
at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.getFileStatus(ClientDistributedCacheManager.java:224)
at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.determineTimestamps(ClientDistributedCacheManager.java:93)
at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(ClientDistributedCacheManager.java:57)
at org.apache.hadoop.mapreduce.JobResourceUploader.uploadFiles(JobResourceUploader.java:179)
at org.apache.hadoop.mapreduce.JobSubmitter.copyAndConfigureFiles(JobSubmitter.java:95)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:190)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287)
at gobblin.runtime.mapreduce.MRJobLauncher.runWorkUnits(MRJobLauncher.java:204)
at gobblin.runtime.AbstractJobLauncher.launchJob(AbstractJobLauncher.java:296)
at gobblin.runtime.mapreduce.CliMRJobLauncher.launchJob(CliMRJobLauncher.java:84)
at gobblin.runtime.mapreduce.CliMRJobLauncher.run(CliMRJobLauncher.java:61)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
at gobblin.runtime.mapreduce.CliMRJobLauncher.main(CliMRJobLauncher.java:106)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
2016-08-11 22:51:09 KST INFO [main] gobblin.util.ExecutorsUtils 125 - Attempting to shutdown ExecutorService: java.util.concurrent.ThreadPoolExecutor@1d1faa2d[Shutting down, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
...
```
Job conf. file is...
```
job.name=Kafka2HDFS
job.group=Gobblin
job.description=Gobblin
job.lock.enable=false
kafka.brokers=10.96.250.146:9092,10.96.250.147:9092,10.96.250.148:9092
source.class=gobblin.source.extractor.extract.kafka.KafkaSimpleSource
extract.namespace=gobblin.extract.kafka
topic.whitelist=json_.*
writer.builder.class=gobblin.writer.SimpleDataWriterBuilder
writer.file.path.type=tablename
writer.destination.type=HDFS
writer.output.format=txt
fs.uri=<my HDFS domain>
writer.fs.uri=<my HDFS domain>
state.store.fs.uri=<my HDFS domain>
data.publisher.type=gobblin.publisher.BaseDataPublisher
mr.job.max.mappers=1
mr.job.root.dir=/user/chkim/gobblin/
state.store.dir=/user/chkim/gobblin/state-store
task.data.root.dir=/user/chkim/gobblin/jobs/task-data
data.publisher.final.dir=/user/chkim/gobblin/job-output
metrics.reporting.file.enabled=true
metrics.log.dir=${env:GOBBLIN_WORK_DIR}/metrics
metrics.reporting.file.suffix=txt
bootstrap.with.offset=earliest
```
In addition, I have suceeded in pseudo distributed mode hadoop with hdfs://localhost:9000.
How can I fix it ?
Please let me know if you have any hints.
Github Url : https://github.com/linkedin/gobblin/issues/1193
Github Reporter : chanhkim
Github Created At : 2016-08-11T14:58:32Z
Github Updated At : 2017-01-12T05:00:33Z
Comments
nowell-jana wrote on 2016-11-30T08:00:29Z : Did you find a fix for this? Running into it myself
Github Url : https://github.com/linkedin/gobblin/issues/1193#issuecomment-263807734
chavdar wrote on 2016-11-30T14:19:36Z : Hi,
Can you share your log?
Thanks.
On Wed, Nov 30, 2016 at 12:00 AM, nowell-jana <notifications@github.com>
wrote:
> Did you find a fix for this? Running into it myself
>
> —
> You are receiving this because you are subscribed to this thread.
> Reply to this email directly, view it on GitHub
> <https://github.com/linkedin/gobblin/issues/1193#issuecomment-263807734>,
> or mute the thread
> <https://github.com/notifications/unsubscribe-auth/AA4sG60RjPAWWSsw6nTr05BspCa11oIbks5rDS0dgaJpZM4JiNfP>
> .
>
Github Url : https://github.com/linkedin/gobblin/issues/1193#issuecomment-263884407
nowell-jana wrote on 2016-11-30T23:02:55Z : 2016-11-30 22:57:13 UTC ERROR [main] gobblin.runtime.AbstractJobLauncher 321 - Failed to launch and run job job_gobblin-counter_1480546570415: java.io.FileNotFoundException: File /var/gobblin-data/work/working/gobblin-counter/job_gobblin-counter_1480546570415/job.state does not exist
java.io.FileNotFoundException: File /var/gobblin-data/work/working/gobblin-counter/job_gobblin-counter_1480546570415/job.state does not exist
at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:534)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:747)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:524)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409)
at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.getFileStatus(ClientDistributedCacheManager.java:288)
at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.getFileStatus(ClientDistributedCacheManager.java:224)
at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.determineTimestamps(ClientDistributedCacheManager.java:93)
at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(ClientDistributedCacheManager.java:57)
at org.apache.hadoop.mapreduce.JobSubmitter.copyAndConfigureFiles(JobSubmitter.java:269)
at org.apache.hadoop.mapreduce.JobSubmitter.copyAndConfigureFiles(JobSubmitter.java:390)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:483)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1296)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1293)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1293)
at gobblin.runtime.mapreduce.MRJobLauncher.runWorkUnits(MRJobLauncher.java:203)
at gobblin.runtime.AbstractJobLauncher.launchJob(AbstractJobLauncher.java:296)
at gobblin.runtime.mapreduce.CliMRJobLauncher.launchJob(CliMRJobLauncher.java:84)
at gobblin.runtime.mapreduce.CliMRJobLauncher.run(CliMRJobLauncher.java:61)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
at gobblin.runtime.mapreduce.CliMRJobLauncher.main(CliMRJobLauncher.java:106)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Github Url : https://github.com/linkedin/gobblin/issues/1193#issuecomment-264024786
nowell-jana wrote on 2016-11-30T23:05:12Z : It's interesting, I can run in mapreduce mode with hadoop in standalone fine, but attempting to pick up where I left off running in map reduce mode on EMR I'm getting this error. It's possible it's looking in HDFS on the EMR master now instead of on the filesystem of the machine driving it all- looking into that.
Github Url : https://github.com/linkedin/gobblin/issues/1193#issuecomment-264025264
nowell-jana wrote on 2016-12-01T04:35:29Z : Here is my mapreduce configuration:
```
###############################################################################
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- Gobblin MapReduce configurations #######################
###############################################################################
- Gobblin MapReduce configurations #######################
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- Thread pool settings for the task executor
taskexecutor.threadpool.size=2
taskretry.threadpool.coresize=1
taskretry.threadpool.maxsize=2
- File system URIs
fs.uri=hdfs://<EMR Master Node IP>:8020
writer.fs.uri=${fs.uri}
state.store.fs.uri=file:///
- S3 config
data.publisher.fs.uri=${env:S3_BUCKET_URL}
fs.s3a.access.key=${env:S3_ACCESS_KEY}
fs.s3a.private.key=${env:S3_PRIVATE_KEY}
- Writer related configuration properties
writer.destination.type=HDFS
writer.output.format=AVRO
writer.staging.dir=${env:GOBBLIN_WORK_DIR}/task-staging
writer.output.dir=${env:GOBBLIN_WORK_DIR}/task-output
- Data publisher related configuration properties
data.publisher.type=gobblin.publisher.BaseDataPublisher
data.publisher.final.dir=${env:GOBBLIN_WORK_DIR}/job-output
data.publisher.replace.final.dir=false
- Directory where job/task state files are stored
state.store.dir=${env:GOBBLIN_WORK_DIR}/state-store
- Directory where error files from the quality checkers are stored
qualitychecker.row.err.file=${env:GOBBLIN_WORK_DIR}/err
- Directory where job locks are stored
job.lock.dir=${env:GOBBLIN_WORK_DIR}/locks
- Directory where metrics log files are stored
metrics.log.dir=${env:GOBBLIN_WORK_DIR}/metrics
- Interval of task state reporting in milliseconds
task.status.reportintervalinms=5000
- MapReduce properties
mr.job.root.dir=${env:GOBBLIN_WORK_DIR}/working
```
Github Url : https://github.com/linkedin/gobblin/issues/1193#issuecomment-264076807
nowell-jana wrote on 2016-12-01T04:39:20Z : And to give a clearer description of what I'm attempting: I'm trying to run in mapreduce mode (with `gobblin-mapreduce.sh`) by pointing from a separate machine to an EMR cluster.
Github Url : https://github.com/linkedin/gobblin/issues/1193#issuecomment-264077206