Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
-
None
-
None
-
Apache Gobblin 170807
Description
Hi,
I'm trying to set a time limit on Gobblin when extracting data from kafka,
my job config looks like this:
```
job.name=events
job.group=events
job.description=events
job.lock.enabled=true
job.schedule=0 0 0/1 1/1 * ? *
kafka.brokers=${env:KAFKA_BROKERS}
kafka.workunit.packer.type=BI_LEVEL
source.class=gobblin.source.extractor.extract.kafka.KafkaSimpleSource
extract.namespace=gobblin.extract.kafka
extract.limit.enabled=true
extract.limit.type=time
extract.limit.time.limit=120
extract.limit.time.limit.timeunit=minutes
converter.classes=com.custom.gobblin.GobblinBytesToAvroConverter
writer.partitioner.class=com.custom.gobblin.GobblinDailyPartitioner
writer.partition.pattern=YYYY/MM/dd
writer.partition.columns=timestamp
writer.builder.class=gobblin.writer.AvroDataWriterBuilder
writer.file.path.type=tablename
writer.partition.timezone=UTC
writer.destination.type=HDFS
writer.output.format=AVRO
writer.codec.type=SNAPPY
writer.deflate.level=6
writer.include.partition.in.file.names=false
mr.job.max.mappers=5
topic.whitelist=.*
data.publisher.type=gobblin.publisher.TimePartitionedDataPublisher
metrics.reporting.file.enabled=true
metrics.log.dir=${env:GOBBLIN_WORK_DIR}/metrics
metrics.reporting.file.suffix=txt
```
Looking at the docs, this should work, but I keep getting:
```
2017-03-06 17:15:01 ERROR Task:266 - Task task_events_1488816000695_68 failed
java.lang.IllegalArgumentException
at com.google.common.base.Preconditions.checkArgument(Preconditions.java:77)
at gobblin.util.limiter.DefaultLimiterFactory.newLimiter(DefaultLimiterFactory.java:74)
at gobblin.runtime.TaskContext.getExtractor(TaskContext.java:112)
at gobblin.runtime.Task.run(Task.java:127)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
```
I'm using gobblin 0.8.0.
Thanks.
Github Url : https://github.com/linkedin/gobblin/issues/1675
Github Reporter : guilhermef
Github Created At : 2017-03-06T18:23:07Z
Github Updated At : 2017-05-19T05:58:26Z
Comments
abti wrote on 2017-03-30T00:16:10Z : jira-assignee:lesun
jira-label:gobblin-os-github
On Mon, Mar 6, 2017 at 10:23 AM, Guilherme Souza <notifications@github.com>
wrote:
> Hi,
> I'm trying to set a time limit on Gobblin when extracting data from kafka,
> my job config looks like this:
>
> job.name=events
> job.group=events
> job.description=events
> job.lock.enabled=true
> job.schedule=0 0 0/1 1/1 * ? *
>
> kafka.brokers=${env:KAFKA_BROKERS}
> kafka.workunit.packer.type=BI_LEVEL
>
> source.class=gobblin.source.extractor.extract.kafka.KafkaSimpleSource
> extract.namespace=gobblin.extract.kafka
> extract.limit.enabled=true
> extract.limit.type=time
> extract.limit.time.limit=120
> extract.limit.time.limit.timeunit=minutes
>
> converter.classes=com.custom.gobblin.GobblinBytesToAvroConverter
>
> writer.partitioner.class=com.custom.gobblin.GobblinDailyPartitioner
> writer.partition.pattern=YYYY/MM/dd
> writer.partition.columns=timestamp
> writer.builder.class=gobblin.writer.AvroDataWriterBuilder
> writer.file.path.type=tablename
> writer.partition.timezone=UTC
> writer.destination.type=HDFS
> writer.output.format=AVRO
>
> writer.codec.type=SNAPPY
> writer.deflate.level=6writer.include.partition.in.file.names=false
>
> mr.job.max.mappers=5
>
> topic.whitelist=.*
> data.publisher.type=gobblin.publisher.TimePartitionedDataPublisher
>
> metrics.reporting.file.enabled=true
> metrics.log.dir=${env:GOBBLIN_WORK_DIR}/metrics
> metrics.reporting.file.suffix=txt
>
> Looking at the docs, this should work, but I keep getting:
>
> 2017-03-06 17:15:01 ERROR Task:266 - Task task_events_1488816000695_68 failed
> java.lang.IllegalArgumentException
> at com.google.common.base.Preconditions.checkArgument(Preconditions.java:77)
> at gobblin.util.limiter.DefaultLimiterFactory.newLimiter(DefaultLimiterFactory.java:74)
> at gobblin.runtime.TaskContext.getExtractor(TaskContext.java:112)
> at gobblin.runtime.Task.run(Task.java:127)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Thanks.
>
> —
> You are receiving this because you are subscribed to this thread.
> Reply to this email directly, view it on GitHub
> <https://github.com/linkedin/gobblin/issues/1675>, or mute the thread
> <https://github.com/notifications/unsubscribe-auth/AAEPe5Mge09MDxpWNEjMlTXs6SZm9BIBks5rjE8MgaJpZM4MUgwu>
> .
>
Github Url : https://github.com/linkedin/gobblin/issues/1675#issuecomment-290264501
ibuenros wrote on 2017-03-30T00:46:36Z : Hello,
You are using the wrong key. Please use extract.limit.timeLimit and
extract.limit.timeLimitTimeunit instead. If this is wrong in the docs,
can you please point us to the location so we can fix it?
Thanks,
Issac
On Mon, Mar 6, 2017 at 10:23 AM, Guilherme Souza <notifications@github.com>
wrote:
> Hi,
> I'm trying to set a time limit on Gobblin when extracting data from kafka,
> my job config looks like this:
>
> job.name=events
> job.group=events
> job.description=events
> job.lock.enabled=true
> job.schedule=0 0 0/1 1/1 * ? *
>
> kafka.brokers=${env:KAFKA_BROKERS}
> kafka.workunit.packer.type=BI_LEVEL
>
> source.class=gobblin.source.extractor.extract.kafka.KafkaSimpleSource
> extract.namespace=gobblin.extract.kafka
> extract.limit.enabled=true
> extract.limit.type=time
> extract.limit.time.limit=120
> extract.limit.time.limit.timeunit=minutes
>
> converter.classes=com.custom.gobblin.GobblinBytesToAvroConverter
>
> writer.partitioner.class=com.custom.gobblin.GobblinDailyPartitioner
> writer.partition.pattern=YYYY/MM/dd
> writer.partition.columns=timestamp
> writer.builder.class=gobblin.writer.AvroDataWriterBuilder
> writer.file.path.type=tablename
> writer.partition.timezone=UTC
> writer.destination.type=HDFS
> writer.output.format=AVRO
>
> writer.codec.type=SNAPPY
> writer.deflate.level=6writer.include.partition.in.file.names=false
>
> mr.job.max.mappers=5
>
> topic.whitelist=.*
> data.publisher.type=gobblin.publisher.TimePartitionedDataPublisher
>
> metrics.reporting.file.enabled=true
> metrics.log.dir=${env:GOBBLIN_WORK_DIR}/metrics
> metrics.reporting.file.suffix=txt
>
> Looking at the docs, this should work, but I keep getting:
>
> 2017-03-06 17:15:01 ERROR Task:266 - Task task_events_1488816000695_68 failed
> java.lang.IllegalArgumentException
> at com.google.common.base.Preconditions.checkArgument(Preconditions.java:77)
> at gobblin.util.limiter.DefaultLimiterFactory.newLimiter(DefaultLimiterFactory.java:74)
> at gobblin.runtime.TaskContext.getExtractor(TaskContext.java:112)
> at gobblin.runtime.Task.run(Task.java:127)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Thanks.
>
> —
> You are receiving this because you are subscribed to this thread.
> Reply to this email directly, view it on GitHub
> <https://github.com/linkedin/gobblin/issues/1675>, or mute the thread
> <https://github.com/notifications/unsubscribe-auth/ABTQkB6yYANduaDu-j9ykG27ztSeWMoJks5rjE8NgaJpZM4MUgwu>
> .
>
Github Url : https://github.com/linkedin/gobblin/issues/1675#issuecomment-290268851
madiks wrote on 2017-05-19T05:58:26Z : I found the location in the doc, it's in Case Studies » Kafka-HDFS Ingestion
https://github.com/linkedin/gobblin/blob/master/gobblin-docs/case-studies/Kafka-HDFS-Ingestion.md
https://gobblin.readthedocs.io/en/latest/case-studies/Kafka-HDFS-Ingestion/
Github Url : https://github.com/linkedin/gobblin/issues/1675#issuecomment-302615980