Details
-
Sub-task
-
Status: Open
-
Major
-
Resolution: Unresolved
-
3.0.0-beta1, 3.0.0-alpha4
-
None
-
None
Description
NativeAzureFileSystem instances are associated with the blob container used to initialize the file system. Assuming that a file system instance fs is associated with a container A, when trying to access a blob inside another container B, fs still tries to find the blob inside container A. If there happens to be two blobs with the same name inside both containers, the user may get a wrong result because fs reads the contents from the blob inside container A instead of container B.
You may reproduce it by running the following self-contained Scala script using Ammonite:
#!/usr/bin/env amm --no-remote-logging import $ivy.`com.jsuereth::scala-arm:2.0` import $ivy.`com.microsoft.azure:azure-storage:5.2.0` import $ivy.`org.apache.hadoop:hadoop-azure:3.0.0-alpha4` import $ivy.`org.apache.hadoop:hadoop-common:3.0.0-alpha4` import $ivy.`org.scalatest::scalatest:3.0.3` import java.io.{BufferedReader, InputStreamReader} import java.net.URI import java.time.{Duration, Instant} import java.util.{Date, EnumSet} import com.microsoft.azure.storage.{CloudStorageAccount, StorageCredentialsAccountAndKey} import com.microsoft.azure.storage.blob.{SharedAccessBlobPermissions, SharedAccessBlobPolicy} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.azure.{AzureException, NativeAzureFileSystem} import org.scalatest.Assertions._ import resource._ // Utility implicit conversion for auto resource management. implicit def `Closable->Resource`[T <: { def close() }]: Resource[T] = new Resource[T] { override def close(closable: T): Unit = closable.close() } // Credentials information val ACCOUNT = "** REDACTED **" val ACCESS_KEY = "** REDACTED **" // We'll create two different containers, both contain a blob named "test-blob" but with different // contents. val CONTAINER_A = "container-a" val CONTAINER_B = "container-b" val TEST_BLOB = "test-blob" val blobClient = { val credentials = new StorageCredentialsAccountAndKey(ACCOUNT, ACCESS_KEY) val account = new CloudStorageAccount(credentials, /* useHttps */ true) account.createCloudBlobClient() } // Generates a read-only SAS key restricted within "container-a". val sasKeyForContainerA = { val since = Instant.now() minus Duration.ofMinutes(10) val duration = Duration.ofHours(1) val policy = new SharedAccessBlobPolicy() policy.setSharedAccessStartTime(Date.from(since)) policy.setSharedAccessExpiryTime(Date.from(since plus duration)) policy.setPermissions(EnumSet.of( SharedAccessBlobPermissions.READ, SharedAccessBlobPermissions.LIST )) blobClient .getContainerReference(CONTAINER_A) .generateSharedAccessSignature(policy, null) } // Sets up testing containers and blobs using the Azure storage SDK: // // container-a/test-blob => "foo" // container-b/test-blob => "bar" { val containerARef = blobClient.getContainerReference(CONTAINER_A) val containerBRef = blobClient.getContainerReference(CONTAINER_B) containerARef.createIfNotExists() containerARef.getBlockBlobReference(TEST_BLOB).uploadText("foo") containerBRef.createIfNotExists() containerBRef.getBlockBlobReference(TEST_BLOB).uploadText("bar") } val pathA = new Path(s"wasbs://$CONTAINER_A@$ACCOUNT.blob.core.windows.net/$TEST_BLOB") val pathB = new Path(s"wasbs://$CONTAINER_B@$ACCOUNT.blob.core.windows.net/$TEST_BLOB") for { // Creates a file system associated with "container-a". fs <- managed { val conf = new Configuration conf.set("fs.wasbs.impl", classOf[NativeAzureFileSystem].getName) conf.set(s"fs.azure.sas.$CONTAINER_A.$ACCOUNT.blob.core.windows.net", sasKeyForContainerA) pathA.getFileSystem(conf) } // Opens a reader pointing to "container-a/test-blob". We expect to get the string "foo" written // to this blob previously. readerA <- managed(new BufferedReader(new InputStreamReader(fs open pathA))) // Opens a reader pointing to "container-b/test-blob". We expect to get an exception since the SAS // key used to create the `FileSystem` instance is restricted to "container-a". readerB <- managed(new BufferedReader(new InputStreamReader(fs open pathB))) } { // Should get "foo" assert(readerA.readLine() == "foo") // Should catch an exception ... intercept[AzureException] { // ... but instead, we get string "foo" here, which indicates that the readerB was reading from // "container-a" instead of "container-b". val contents = readerB.readLine() println(s"Should not reach here but we got $contents") } }