Details
-
Bug
-
Status: Open
-
P3
-
Resolution: Unresolved
-
None
-
None
-
None
Description
The user has a job that reads a few hundred thousand files and then writes them to BigQuery. This generates 1 temp file per input file. Then we gather the temp files into a View.asList() side input - and this side input ends up containing a few hundred thousand tiny ISM files, with 1 element per file, which performs horribly (taking hours to read the side input).
I think we need to reshuffle things onto a reasonable number of shards before writing them to ISM.
A side issue: this https://github.com/apache/beam/blob/v2.2.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ReifyAsIterable.java#L46 triggers also the coder size estimation logic, which falsely thinks that size estimation in this case is cheap, and does double the work, as evidenced by the following stack trace:
Processing lull for PT30900.015S in state process of WriteTOBigQuery/BatchLoads/ReifyResults/ParDo(Anonymous)
java.net.SocketInputStream.socketRead0(Native Method)
java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
java.net.SocketInputStream.read(SocketInputStream.java:170)
java.net.SocketInputStream.read(SocketInputStream.java:141)
sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
sun.security.ssl.InputRecord.read(InputRecord.java:503)
sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:983)
sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:940)
sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
java.io.BufferedInputStream.read(BufferedInputStream.java:345)
sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:704)
sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:647)
sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1536)
sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1441)
java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338)
com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:37)
com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94)
com.google.api.client.http.HttpRequest.execute(HttpRequest.java:981)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeMedia(AbstractGoogleClientRequest.java:380)
com.google.api.services.storage.Storage$Objects$Get.executeMedia(Storage.java:4784)
com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.openStreamAndSetMetadata(GoogleCloudStorageReadChannel.java:656)
com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.performLazySeek(GoogleCloudStorageReadChannel.java:560)
com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.read(GoogleCloudStorageReadChannel.java:289)
sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:65)
sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:109)
sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
java.io.InputStream.read(InputStream.java:101)
sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:81)
org.apache.beam.sdk.util.VarInt.decodeLong(VarInt.java:79)
org.apache.beam.sdk.util.VarInt.decodeInt(VarInt.java:63)
org.apache.beam.runners.dataflow.internal.IsmFormat$KeyPrefixCoder.decode(IsmFormat.java:694)
com.google.cloud.dataflow.worker.IsmReader.readKey(IsmReader.java:999)
com.google.cloud.dataflow.worker.IsmReader.access$2000(IsmReader.java:79)
com.google.cloud.dataflow.worker.IsmReader$WithinShardIsmReaderIterator.advance(IsmReader.java:952)
com.google.cloud.dataflow.worker.IsmReader$WithinShardIsmReaderIterator.start(IsmReader.java:942)
com.google.cloud.dataflow.worker.IsmReader$IsmCacheLoader.call(IsmReader.java:580)
com.google.cloud.dataflow.worker.IsmReader$IsmCacheLoader.call(IsmReader.java:569)
com.google.cloud.dataflow.worker.IsmReader$IsmCacheLoader.call(IsmReader.java:554)
com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4904)
com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628)
com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336)
com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295)
com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208)
com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache.get(LocalCache.java:4053)
com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4899)
com.google.cloud.dataflow.worker.IsmReader.fetch(IsmReader.java:605)
com.google.cloud.dataflow.worker.IsmReader.getBlock(IsmReader.java:770)
com.google.cloud.dataflow.worker.IsmReader.access$1000(IsmReader.java:79)
com.google.cloud.dataflow.worker.IsmReader$IsmPrefixReaderIterator.get(IsmReader.java:641)
com.google.cloud.dataflow.worker.IsmSideInputReader$ListOverReaderIterators.getUsingLong(IsmSideInputReader.java:674)
com.google.cloud.dataflow.worker.IsmSideInputReader$ListOverReaderIterators.access$1300(IsmSideInputReader.java:620)
com.google.cloud.dataflow.worker.IsmSideInputReader$ListOverReaderIterators$ListIteratorOverReaderIterators.next(IsmSideInputReader.java:715)
java.util.Collections$UnmodifiableCollection$1.next(Collections.java:1042)
org.apache.beam.sdk.coders.IterableLikeCoder.registerByteSizeObserver(IterableLikeCoder.java:195)
org.apache.beam.sdk.coders.IterableLikeCoder.registerByteSizeObserver(IterableLikeCoder.java:60)
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:685)
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:599)
com.google.cloud.dataflow.worker.MapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(MapTaskExecutorFactory.java:520)
com.google.cloud.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:134)
com.google.cloud.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:63)
com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:46)
com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:272)