Uploaded image for project: 'Apache NiFi'
  1. Apache NiFi
  2. NIFI-7830

PutAzureDataLakeStorage fails on large files

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.12.0
    • 1.13.0
    • Extensions
    • None

    Description

      For large files, the processor fails to send data:

      2020-09-20 11:47:56,287 ERROR [Timer-Driven Process Thread-231] o.a.n.p.a.s.PutAzureDataLakeStorage PutAzureDataLakeStorage[id=a71338e6-eb42-1fa2-aa26-c519d532099a] Failed to create file on Azure Data Lake Storage: com.azure.storage.file.datalake.models.DataLakeStorageException: Status code 413, "{"error":{"code":"RequestBodyTooLarge","message":"The request body is too large and exceeds the maximum permissible limit.\nRequestId:aa6743ef-401f-0035-5c65-8f518f000000\nTime:2020-09-20T15:46:43.1536078Z"}}"
      com.azure.storage.file.datalake.models.DataLakeStorageException: Status code 413, "{"error":{"code":"RequestBodyTooLarge","message":"The request body is too large and exceeds the maximum permissible limit.\nRequestId:aa6743ef-401f-0035-5c65-8f518f000000\nTime:2020-09-20T15:46:43.1536078Z"}}"
      	at sun.reflect.GeneratedConstructorAccessor2091.newInstance(Unknown Source)
      	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
      	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
      	at com.azure.core.http.rest.RestProxy.instantiateUnexpectedException(RestProxy.java:320)
      	at com.azure.core.http.rest.RestProxy.lambda$ensureExpectedStatus$3(RestProxy.java:361)
      	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:118)
      	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1755)
      	at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.signalCached(MonoCacheTime.java:320)
      	at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.onNext(MonoCacheTime.java:337)
      	at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2317)
      	at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.onSubscribe(MonoCacheTime.java:276)
      	at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:191)
      	at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53)
      	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57)
      	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
      	at reactor.core.publisher.MonoCacheTime.subscribeOrReturn(MonoCacheTime.java:132)
      	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57)
      	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150)
      	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
      	at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:123)
      	at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:112)
      	at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:213)
      	at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:123)
      	at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:178)
      	at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:96)
      	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1755)
      	at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete(MonoCollectList.java:121)
      	at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252)
      	at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)
      	at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:366)
      	at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:367)
      	at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:423)
      	at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:607)
      	at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:96)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
      	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
      	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
      	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
      	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
      	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
      	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
      	at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1518)
      	at io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1279)
      	at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1316)
      	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
      	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
      	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
      	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
      	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
      	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
      	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
      	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
      	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
      	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
      	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
      	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
      	at java.lang.Thread.run(Thread.java:745)
      	Suppressed: java.lang.Exception: #block terminated with an error
      		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
      		at reactor.core.publisher.Mono.block(Mono.java:1678)
      		at com.azure.storage.common.implementation.StorageImplUtils.blockWithOptionalTimeout(StorageImplUtils.java:99)
      		at com.azure.storage.file.datalake.DataLakeFileClient.appendWithResponse(DataLakeFileClient.java:269)
      		at com.azure.storage.file.datalake.DataLakeFileClient.append(DataLakeFileClient.java:233)
      		at org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage.onTrigger(PutAzureDataLakeStorage.java:124)
      		at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
      		at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1176)
      		at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213)
      		at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
      		at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
      		at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      		at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
      		at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
      		at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
      		at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      		at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      		... 1 common frames omitted
      

      Please see https://docs.microsoft.com/en-us/troubleshoot/azure/general/request-body-large

      There's a 4-MB limit for each call to the Azure Storage service. If your file is larger than 4 MB, you must break it in chunks. For more information, seeĀ Azure Storage scalability and performance targets.

      Attachments

        Issue Links

          Activity

            People

              turcsanyip Peter Turcsanyi
              pvillard Pierre Villard
              Votes:
              8 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 1.5h
                  1.5h