Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-3778

Very poor performance of side inputs when input is finely sharded

Details

    • Bug
    • Status: Open
    • P3
    • Resolution: Unresolved
    • None
    • None
    • runner-dataflow
    • None

    Description

      This thread:
      https://lists.apache.org/thread.html/324a4f86e567e3e1692466e70f44a08276123b467bacb2ecbf00515f@%3Cuser.beam.apache.org%3E

      The user has a job that reads a few hundred thousand files and then writes them to BigQuery. This generates 1 temp file per input file. Then we gather the temp files into a View.asList() side input - and this side input ends up containing a few hundred thousand tiny ISM files, with 1 element per file, which performs horribly (taking hours to read the side input).

      I think we need to reshuffle things onto a reasonable number of shards before writing them to ISM.

      A side issue: this https://github.com/apache/beam/blob/v2.2.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ReifyAsIterable.java#L46 triggers also the coder size estimation logic, which falsely thinks that size estimation in this case is cheap, and does double the work, as evidenced by the following stack trace:

      Processing lull for PT30900.015S in state process of WriteTOBigQuery/BatchLoads/ReifyResults/ParDo(Anonymous)
      java.net.SocketInputStream.socketRead0(Native Method)
      java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
      java.net.SocketInputStream.read(SocketInputStream.java:170)
      java.net.SocketInputStream.read(SocketInputStream.java:141)
      sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
      sun.security.ssl.InputRecord.read(InputRecord.java:503)
      sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:983)
      sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:940)
      sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
      java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
      java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
      java.io.BufferedInputStream.read(BufferedInputStream.java:345)
      sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:704)
      sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:647)
      sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1536)
      sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1441)
      java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
      sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338)
      com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:37)
      com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94)
      com.google.api.client.http.HttpRequest.execute(HttpRequest.java:981)
      com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
      com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
      com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeMedia(AbstractGoogleClientRequest.java:380)
      com.google.api.services.storage.Storage$Objects$Get.executeMedia(Storage.java:4784)
      com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.openStreamAndSetMetadata(GoogleCloudStorageReadChannel.java:656)
      com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.performLazySeek(GoogleCloudStorageReadChannel.java:560)
      com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.read(GoogleCloudStorageReadChannel.java:289)
      sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:65)
      sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:109)
      sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
      java.io.InputStream.read(InputStream.java:101)
      sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:81)
      org.apache.beam.sdk.util.VarInt.decodeLong(VarInt.java:79)
      org.apache.beam.sdk.util.VarInt.decodeInt(VarInt.java:63)
      org.apache.beam.runners.dataflow.internal.IsmFormat$KeyPrefixCoder.decode(IsmFormat.java:694)
      com.google.cloud.dataflow.worker.IsmReader.readKey(IsmReader.java:999)
      com.google.cloud.dataflow.worker.IsmReader.access$2000(IsmReader.java:79)
      com.google.cloud.dataflow.worker.IsmReader$WithinShardIsmReaderIterator.advance(IsmReader.java:952)
      com.google.cloud.dataflow.worker.IsmReader$WithinShardIsmReaderIterator.start(IsmReader.java:942)
      com.google.cloud.dataflow.worker.IsmReader$IsmCacheLoader.call(IsmReader.java:580)
      com.google.cloud.dataflow.worker.IsmReader$IsmCacheLoader.call(IsmReader.java:569)
      com.google.cloud.dataflow.worker.IsmReader$IsmCacheLoader.call(IsmReader.java:554)
      com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4904)
      com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628)
      com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336)
      com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295)
      com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208)
      com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache.get(LocalCache.java:4053)
      com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4899)
      com.google.cloud.dataflow.worker.IsmReader.fetch(IsmReader.java:605)
      com.google.cloud.dataflow.worker.IsmReader.getBlock(IsmReader.java:770)
      com.google.cloud.dataflow.worker.IsmReader.access$1000(IsmReader.java:79)
      com.google.cloud.dataflow.worker.IsmReader$IsmPrefixReaderIterator.get(IsmReader.java:641)
      com.google.cloud.dataflow.worker.IsmSideInputReader$ListOverReaderIterators.getUsingLong(IsmSideInputReader.java:674)
      com.google.cloud.dataflow.worker.IsmSideInputReader$ListOverReaderIterators.access$1300(IsmSideInputReader.java:620)
      com.google.cloud.dataflow.worker.IsmSideInputReader$ListOverReaderIterators$ListIteratorOverReaderIterators.next(IsmSideInputReader.java:715)
      java.util.Collections$UnmodifiableCollection$1.next(Collections.java:1042)
      org.apache.beam.sdk.coders.IterableLikeCoder.registerByteSizeObserver(IterableLikeCoder.java:195)
      org.apache.beam.sdk.coders.IterableLikeCoder.registerByteSizeObserver(IterableLikeCoder.java:60)
      org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:685)
      org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:599)
      com.google.cloud.dataflow.worker.MapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(MapTaskExecutorFactory.java:520)
      com.google.cloud.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:134)
      com.google.cloud.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:63)
      com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:46)
      com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:272)

      Attachments

        Activity

          People

            Unassigned Unassigned
            jkff Eugene Kirpichov
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: