Log workAgile BoardRank to TopRank to BottomAttach filesAttach ScreenshotBulk Copy AttachmentsBulk Move AttachmentsAdd voteVotersWatch issueWatchersConvert to IssueMoveLinkCloneLabelsUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.0.0-beta1, 3.0.0-alpha4
    • None
    • fs
    • None

    Description

      NativeAzureFileSystem instances are associated with the blob container used to initialize the file system. Assuming that a file system instance fs is associated with a container A, when trying to access a blob inside another container B, fs still tries to find the blob inside container A. If there happens to be two blobs with the same name inside both containers, the user may get a wrong result because fs reads the contents from the blob inside container A instead of container B.

      You may reproduce it by running the following self-contained Scala script using Ammonite:

      #!/usr/bin/env amm --no-remote-logging
      
      import $ivy.`com.jsuereth::scala-arm:2.0`
      import $ivy.`com.microsoft.azure:azure-storage:5.2.0`
      import $ivy.`org.apache.hadoop:hadoop-azure:3.0.0-alpha4`
      import $ivy.`org.apache.hadoop:hadoop-common:3.0.0-alpha4`
      import $ivy.`org.scalatest::scalatest:3.0.3`
      
      import java.io.{BufferedReader, InputStreamReader}
      import java.net.URI
      import java.time.{Duration, Instant}
      import java.util.{Date, EnumSet}
      
      import com.microsoft.azure.storage.{CloudStorageAccount, StorageCredentialsAccountAndKey}
      import com.microsoft.azure.storage.blob.{SharedAccessBlobPermissions, SharedAccessBlobPolicy}
      import org.apache.hadoop.conf.Configuration
      import org.apache.hadoop.fs.{FileSystem, Path}
      import org.apache.hadoop.fs.azure.{AzureException, NativeAzureFileSystem}
      import org.scalatest.Assertions._
      import resource._
      
      // Utility implicit conversion for auto resource management.
      implicit def `Closable->Resource`[T <: { def close() }]: Resource[T] = new Resource[T] {
        override def close(closable: T): Unit = closable.close()
      }
      
      // Credentials information
      val ACCOUNT = "** REDACTED **"
      val ACCESS_KEY = "** REDACTED **"
      
      // We'll create two different containers, both contain a blob named "test-blob" but with different
      // contents.
      val CONTAINER_A = "container-a"
      val CONTAINER_B = "container-b"
      val TEST_BLOB = "test-blob"
      
      val blobClient = {
        val credentials = new StorageCredentialsAccountAndKey(ACCOUNT, ACCESS_KEY)
        val account = new CloudStorageAccount(credentials, /* useHttps */ true)
        account.createCloudBlobClient()
      }
      
      // Generates a read-only SAS key restricted within "container-a".
      val sasKeyForContainerA = {
        val since = Instant.now() minus Duration.ofMinutes(10)
        val duration = Duration.ofHours(1)
        val policy = new SharedAccessBlobPolicy()
      
        policy.setSharedAccessStartTime(Date.from(since))
        policy.setSharedAccessExpiryTime(Date.from(since plus duration))
        policy.setPermissions(EnumSet.of(
          SharedAccessBlobPermissions.READ,
          SharedAccessBlobPermissions.LIST
        ))
      
        blobClient
          .getContainerReference(CONTAINER_A)
          .generateSharedAccessSignature(policy, null)
      }
      
      // Sets up testing containers and blobs using the Azure storage SDK:
      //
      //   container-a/test-blob => "foo"
      //   container-b/test-blob => "bar"
      {
        val containerARef = blobClient.getContainerReference(CONTAINER_A)
        val containerBRef = blobClient.getContainerReference(CONTAINER_B)
      
        containerARef.createIfNotExists()
        containerARef.getBlockBlobReference(TEST_BLOB).uploadText("foo")
      
        containerBRef.createIfNotExists()
        containerBRef.getBlockBlobReference(TEST_BLOB).uploadText("bar")
      }
      
      val pathA = new Path(s"wasbs://$CONTAINER_A@$ACCOUNT.blob.core.windows.net/$TEST_BLOB")
      val pathB = new Path(s"wasbs://$CONTAINER_B@$ACCOUNT.blob.core.windows.net/$TEST_BLOB")
      
      for {
        // Creates a file system associated with "container-a".
        fs <- managed {
          val conf = new Configuration
          conf.set("fs.wasbs.impl", classOf[NativeAzureFileSystem].getName)
          conf.set(s"fs.azure.sas.$CONTAINER_A.$ACCOUNT.blob.core.windows.net", sasKeyForContainerA)
          pathA.getFileSystem(conf)
        }
      
        // Opens a reader pointing to "container-a/test-blob". We expect to get the string "foo" written
        // to this blob previously.
        readerA <- managed(new BufferedReader(new InputStreamReader(fs open pathA)))
      
        // Opens a reader pointing to "container-b/test-blob". We expect to get an exception since the SAS
        // key used to create the `FileSystem` instance is restricted to "container-a".
        readerB <- managed(new BufferedReader(new InputStreamReader(fs open pathB)))
      } {
        // Should get "foo"
        assert(readerA.readLine() == "foo")
      
        // Should catch an exception ...
        intercept[AzureException] {
          // ... but instead, we get string "foo" here, which indicates that the readerB was reading from
          // "container-a" instead of "container-b".
          val contents = readerB.readLine()
          println(s"Should not reach here but we got $contents")
        }
      }
      

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned Assign to me
            lian cheng Cheng Lian

            Dates

              Created:
              Updated:

              Slack

                Issue deployment