Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Won't Fix
-
None
-
None
-
None
Description
Hello,
We're using Spark ("org.apache.spark:spark-[catalyst|core|sql]_2.12:3.2.2") and Hadoop ("org.apache.hadoop:hadoop-common:3.3.3") and want to retrieve an object stored in a MinIO bucket (MinIO implements the S3 API). Spark relies on Hadoop for this operation.
The MinIO bucket (that we don't manage) is configured with a very restrictive policy that only allows us to retrieve the object (and nothing else). Something like:
{ "statement": [ { "effect": "Allow", "action": [ "s3:GetObject" ], "resource": [ "arn:aws:s3:::minio-bucket/object" ] } ] }
And using the AWS CLI, we can well retrieve the object.
When we try with Spark's DataFrameReader, we receive an HTTP 403 response (access denied) from MinIO:
java.nio.file.AccessDeniedException: s3a://minio-bucket/object: getFileStatus on s3a://minio-bucket/object: com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied. (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; ... at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:255) at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:175) at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3858) at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688) at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$isDirectory$35(S3AFileSystem.java:4724) at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499) at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444) at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337) at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356) at org.apache.hadoop.fs.s3a.S3AFileSystem.isDirectory(S3AFileSystem.java:4722) at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:54) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:370) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:274) at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:245) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:245) at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:571) at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:481) at com.soprabanking.dxp.pure.bf.dataaccess.S3Storage.loadDataset(S3Storage.java:55) at com.soprabanking.dxp.pure.bf.business.step.DatasetLoader.lambda$doLoad$3(DatasetLoader.java:148) at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125) at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816) at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151) at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816) at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249) at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816) at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:251) at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:336) at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398) at reactor.core.publisher.MonoZip$ZipInner.onSubscribe(MonoZip.java:325) at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55) at reactor.core.publisher.Mono.subscribe(Mono.java:4400) at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128) at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157) at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74) at reactor.core.publisher.FluxFilter$FilterSubscriber.onNext(FluxFilter.java:113) at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74) at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onNext(FluxFilterFuseable.java:118) at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180) at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.onNext(FluxPeekFuseable.java:503) at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137) at reactor.core.publisher.Operators$MonoInnerProducerBase.complete(Operators.java:2664) at reactor.core.publisher.MonoSingle$SingleSubscriber.onComplete(MonoSingle.java:180) at com.jakewharton.retrofit2.adapter.reactor.BodyFlux$BodySubscriber.onComplete(BodyFlux.java:80) at reactor.core.publisher.StrictSubscriber.onComplete(StrictSubscriber.java:123) at reactor.core.publisher.FluxCreate$BaseSink.complete(FluxCreate.java:439) at reactor.core.publisher.FluxCreate$LatestAsyncSink.drain(FluxCreate.java:945) at reactor.core.publisher.FluxCreate$LatestAsyncSink.complete(FluxCreate.java:892) at reactor.core.publisher.FluxCreate$SerializedFluxSink.drainLoop(FluxCreate.java:240) at reactor.core.publisher.FluxCreate$SerializedFluxSink.drain(FluxCreate.java:206) at reactor.core.publisher.FluxCreate$SerializedFluxSink.complete(FluxCreate.java:197) at com.jakewharton.retrofit2.adapter.reactor.EnqueueSinkConsumer$DisposableCallback.onResponse(EnqueueSinkConsumer.java:52) at retrofit2.OkHttpCall$1.onResponse(OkHttpCall.java:161) at brave.okhttp3.TraceContextCall$TraceContextCallback.onResponse(TraceContextCall.java:95) at okhttp3.internal.connection.RealCall$AsyncCall.run(RealCall.kt:519) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source)
The credentials are well set but under the hood Hadoop calls MinIO to check whether the object is a directory (which we don't want), and this results in a failure.
We can well retrieve the object by changing MinIO's policy - but this isn't an option to us - to something like:
{ "statement": [ { "effect": "Allow", "action": [ "s3:GetObject" ], "resource": [ "arn:aws:s3:::minio-bucket/object" ] }, { "effect": "Allow", "action": [ "s3:ListBucket" ], "resource": [ "arn:aws:s3:::minio-bucket/" ], "condition": { "StringLike": { "s3:prefix": [ "object", "object/" ] } } } ] }
We couldn't find any way to configure Hadoop so that it just attempts to retrieve the object. Reading HADOOP-17454, it feels like it could be possible to provide options to fine-tune Hadoop's behaviour.
Are there such options? If not, is it a reasonable behaviour to put in place?
Regards,
Sébastien
Please note this is my first time here: I hope I picked the right project, issue type and priority (I tried my best looking around). If not, I'm very sorry about that.