Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-10902

Python-KafkaIO is failing with error Failed to construct kafka consumer

Details

    Description

      Hello Team, I'm trying to use Python-KafkaIO connector to consume messages from Kafka (confluent Cloud) using authentication and getting the following error. Could you please help looking into this issue? 

       
      Error message from worker: generic::unknown: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
      Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set
       
      Here is the Kafka broker config I'm using: 

      consumer_config={'bootstrap.servers': bootstrap_servers, 'group.id': 'Read-BEAM-local-ParDo-1', 'session.timeout.ms': '6000', 'auto.offset.reset':'earliest', 'enable.auto.commit' : 'true', 'sasl.username': ‘sadfasdfasdf’,’sasl.password’:’asdfasdfasdfasdf’,’sasl.mechanisms': 'PLAIN', 'security.protocol': 'SASL_SSL'}
      

      Note: This works with no issues with Java-KafkaIO.  

       

      Full Log:
      --------
       
      {

        "textPayload": "Error message from worker: generic::unknown: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer\n\torg.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)\n\torg.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown Source)\n\torg.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:798)\n\torg.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\torg.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\torg.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1220)\n\torg.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:743)\n\torg.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\torg.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\torg.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1220)\n\torg.apache.beam.fn.harness.FnApiDoFnRunner.access$800(FnApiDoFnRunner.java:133)\n\torg.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1643)\n\torg.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:1812)\n\torg.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\torg.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)\n\torg.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source)\n\torg.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:718)\n\torg.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\torg.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\torg.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:220)\n\torg.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)\n\torg.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:295)\n\torg.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\torg.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tjava.lang.Thread.run(Thread.java:748)\nCaused by: java.lang.RuntimeException: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer\n\torg.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:123)\n\torg.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)\n\torg.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.getProgress(Read.java:879)\n\torg.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DefaultGetSize.invokeGetSize(ByteBuddyDoFnInvokerFactory.java:406)\n\torg.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown Source)\n\torg.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:540)\n\torg.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:1812)\n\torg.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\torg.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:490)\nCaused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer\n\torg.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:781)\n\torg.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:604)\n\torg.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:587)\n\torg.apache.beam.sdk.io.kafka.KafkaUnboundedSource.split(KafkaUnboundedSource.java:64)\n\torg.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)\n\torg.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)\n\torg.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.getProgress(Read.java:879)\n\torg.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DefaultGetSize.invokeGetSize(ByteBuddyDoFnInvokerFactory.java:406)\n\torg.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown Source)\n\torg.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:540)\n\torg.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:1812)\n\torg.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\torg.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:490)\n\torg.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown Source)\n\torg.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:798)\n\torg.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\torg.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\torg.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1220)\n\torg.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:743)\n\torg.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\torg.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\torg.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1220)\n\torg.apache.beam.fn.harness.FnApiDoFnRunner.access$800(FnApiDoFnRunner.java:133)\n\torg.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1643)\n\torg.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:1812)\n\torg.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\torg.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)\n\torg.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source)\n\torg.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:718)\n\torg.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\torg.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\torg.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:220)\n\torg.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)\n\torg.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:295)\n\torg.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\torg.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tjava.lang.Thread.run(Thread.java:748)\nCaused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set\n\torg.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:131)\n\torg.apache.kafka.common.security.JaasContext.load(JaasContext.java:96)\n\torg.apache.kafka.common.security.JaasContext.load(JaasContext.java:78)\n\torg.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:103)\n\torg.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:61)\n\torg.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:86)\n\torg.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)\n\t... 38 more\n\npassed through:\n==>\n    dist_proc/dax/workflow/worker/fnapi_service.cc:619: process_bundle_response_and_done

      Attachments

        Activity

          People

            Unassigned Unassigned
            Gullapalli Ravi
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: