Uploaded image for project: 'Apache Arrow'
  1. Apache Arrow
  2. ARROW-522

[Java] VectorLoader throws exception data schema contains list of maps.

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Critical
    • Resolution: Unresolved
    • Affects Version/s: 0.1.0
    • Fix Version/s: None
    • Component/s: Java
    • Labels:
      None

      Description

      I encountered this exception

      Exception in thread "main" java.lang.IllegalArgumentException: should have as many children as in the schema: found 0 expected 2
          at com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
          at org.apache.arrow.vector.VectorLoader.loadBuffers(VectorLoader.java:91)
          at org.apache.arrow.vector.VectorLoader.loadBuffers(VectorLoader.java:95)
          at org.apache.arrow.vector.VectorLoader.load(VectorLoader.java:69)
      

      The test code is

      public class ArrowTest {
          public static class ByteArrayReadableSeekableByteChannel implements SeekableByteChannel {
              private byte[] byteArray;
              private int position = 0;
      
              public ByteArrayReadableSeekableByteChannel(byte[] byteArray) {
                  if (byteArray == null) {
                      throw new NullPointerException();
                  }
                  this.byteArray = byteArray;
              }
      
              @Override
              public boolean isOpen() {
                  return byteArray != null;
              }
      
              @Override
              public void close() throws IOException {
                  byteArray = null;
              }
      
              @Override
              public int read(final ByteBuffer dst) throws IOException {
                  int remainingInBuf = byteArray.length - this.position;
                  int length = Math.min(dst.remaining(), remainingInBuf);
                  dst.put(this.byteArray, this.position, length);
                  this.position += length;
                  return length;
              }
      
              @Override
              public long position() throws IOException {
                  return this.position;
              }
      
              @Override
              public SeekableByteChannel position(final long newPosition) throws IOException {
                  this.position = (int) newPosition;
                  return this;
              }
      
              @Override
              public long size() throws IOException {
                  return this.byteArray.length;
              }
      
              @Override
              public int write(final ByteBuffer src) throws IOException {
                  throw new UnsupportedOperationException("Read only");
              }
      
              @Override
              public SeekableByteChannel truncate(final long size) throws IOException {
                  throw new UnsupportedOperationException("Read only");
              }
          }
      
          public static void main(String[] argv) throws Exception {
              ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
      
              // write
              try (BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
                      BufferAllocator originalVectorAllocator = allocator
                              .newChildAllocator("child allocator", 1024, Integer.MAX_VALUE);
                      MapVector parent = new MapVector("parent", originalVectorAllocator, null)
              ) {
                  writeData(10, parent);
                  write(parent.getChild("root"), Channels.newChannel(byteArrayOutputStream));
              }
      
              byte[] data = byteArrayOutputStream.toByteArray();
      
              // read
              try (BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
                      BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
                      ArrowReader arrowReader = new ArrowReader(new ByteArrayReadableSeekableByteChannel(data),
                              readerAllocator);
                      BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
                      MapVector parent = new MapVector("parent", vectorAllocator, null)
              ) {
                  ArrowFooter footer = arrowReader.readFooter();
                  Schema schema = footer.getSchema();
      
                  NullableMapVector root = parent.addOrGet("root", Types.MinorType.MAP, NullableMapVector.class);
      
                  VectorLoader vectorLoader = new VectorLoader(schema, root);
      
                  List<ArrowBlock> recordBatches = footer.getRecordBatches();
      
                  for (ArrowBlock rbBlock : recordBatches) {
                      try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) {
                          vectorLoader.load(recordBatch);
                      }
      
                      readData(10, parent);
                  }
              }
          }
      
          private static void writeData(int count, MapVector parent) throws Exception {
              BaseWriter.ComplexWriter writer = new ComplexWriterImpl("root", parent, true);
              BaseWriter.MapWriter rootWriter = writer.rootAsMap();
              IntWriter intWriter = rootWriter.integer("id");
              BaseWriter.ListWriter listWriter = rootWriter.list("list");
              BaseWriter.MapWriter mapFromList = listWriter.map();
              for (int i = 0; i < count; i++) {
                  rootWriter.start();
                  intWriter.setPosition(i);
                  intWriter.writeInt(i);
      
                  listWriter.setPosition(i);
                  listWriter.startList();
                  for (int j = 0; j < 2; j++) {
                      mapFromList.start();
                      mapFromList.integer("type").writeInt(j);
                      mapFromList.bigInt("id").writeBigInt(j * 1000L);
                      mapFromList.end();
                  }
                  listWriter.endList();
                  rootWriter.end();
              }
              writer.setValueCount(count);
          }
      
          private static void readData(int count, MapVector parent) {
              BaseReader.MapReader rootReader = new SingleMapReaderImpl(parent).reader("root");
              FieldReader listReader = rootReader.reader("list");
              for (int i = 0; i < count; i++) {
                  listReader.setPosition(i);
                  while (listReader.next()) {
                      System.out.println(i + " id " + listReader.reader().reader("id").readLong());
                      System.out.println(i + " type " + listReader.reader().reader("type").readInteger());
                  }
              }
          }
      
          private static void write(FieldVector parent, WritableByteChannel out) throws IOException {
              VectorUnloader vectorUnloader = new VectorUnloader(parent);
              Schema schema = vectorUnloader.getSchema();
              try (
                      ArrowWriter arrowWriter = new ArrowWriter(out, schema);
                      ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();
              ) {
                  arrowWriter.writeRecordBatch(recordBatch);
              }
          }
      

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              rleiwang Rock Wang
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated: