Uploaded image for project: 'Apache Arrow'
  1. Apache Arrow
  2. ARROW-14553 [Doc] Java Cookbook Release 1
  3. ARROW-15703

[Java]: Create custom sphinx plugin to help us with java verbose code to showcase highlighting code

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Closed
    • Minor
    • Resolution: Later
    • None
    • None
    • None
    • None

    Description

      We are running java cookbook code thru sphinx using our custom extension https://github.com/apache/arrow-cookbook/blob/main/java/ext/javadoctest.py 

       

      We need to create another extension to only show our end user the java code that is needed to showcase but running the whole java code cookbook at testing part.

       

      Current documentation:

      Validate Delete Data
      ********************
      
      And confirm that it's been deleted:
      
      .. testcode::
      
          import org.apache.arrow.flight.Action;
          import org.apache.arrow.flight.AsyncPutListener;
          import org.apache.arrow.flight.Criteria;
          import org.apache.arrow.flight.FlightClient;
          import org.apache.arrow.flight.FlightDescriptor;
          import org.apache.arrow.flight.FlightEndpoint;
          import org.apache.arrow.flight.FlightInfo;
          import org.apache.arrow.flight.FlightServer;
          import org.apache.arrow.flight.FlightStream;
          import org.apache.arrow.flight.Location;
          import org.apache.arrow.flight.NoOpFlightProducer;
          import org.apache.arrow.flight.PutResult;
          import org.apache.arrow.flight.Result;
          import org.apache.arrow.flight.Ticket;
          import org.apache.arrow.memory.RootAllocator;
          import org.apache.arrow.vector.VarCharVector;
          import org.apache.arrow.vector.VectorSchemaRoot;
          import org.apache.arrow.vector.VectorUnloader;
          import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
          import org.apache.arrow.vector.types.pojo.ArrowType;
          import org.apache.arrow.vector.types.pojo.Field;
          import org.apache.arrow.vector.types.pojo.FieldType;
          import org.apache.arrow.vector.types.pojo.Schema;
      
          import java.io.IOException;
          import java.nio.charset.StandardCharsets;
          import java.util.ArrayList;
          import java.util.Arrays;
          import java.util.Collections;
          import java.util.HashMap;
          import java.util.Iterator;
          import java.util.List;
          import java.util.Map;
      
          class DataInMemory {
              private List<ArrowRecordBatch> listArrowRecordBatch;
              private Schema schema;
              private Long rows;
              public DataInMemory(List<ArrowRecordBatch> listArrowRecordBatch, Schema schema, Long rows) {
                  this.listArrowRecordBatch = listArrowRecordBatch;
                  this.schema = schema;
                  this.rows = rows;
              }
              public List<ArrowRecordBatch> getListArrowRecordBatch() {
                  return listArrowRecordBatch;
              }
              public Schema getSchema() {
                  return schema;
              }
              public Long getRows() {
                  return rows;
              }
          }
      
          // Server
          Location location = Location.forGrpcInsecure("0.0.0.0", 33333);
          Map<FlightDescriptor, DataInMemory> dataInMemory = new HashMap<>();
          Map<String, DataInMemory> mapPojoFlightDataInMemory = new HashMap<>();
          List<ArrowRecordBatch> listArrowRecordBatch = new ArrayList<>();
          try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)){
              FlightServer flightServer = FlightServer.builder(allocator, location, new NoOpFlightProducer(){
                  @Override
                  public Runnable acceptPut(CallContext context, FlightStream flightStream, StreamListener<PutResult> ackStream) {
                      return () -> {
                          long rows = 0;
                          while (flightStream.next()) {
                              VectorUnloader unloader = new VectorUnloader(flightStream.getRoot());
                              try (final ArrowRecordBatch arb = unloader.getRecordBatch()) {
                                  // Retain data information
                                  listArrowRecordBatch.add(arb);
                                  rows = rows + flightStream.getRoot().getRowCount();
                              }
                          }
                          long finalRows = rows;
                          DataInMemory pojoFlightDataInMemory = new DataInMemory(listArrowRecordBatch, flightStream.getSchema(), finalRows);
                          dataInMemory.put(flightStream.getDescriptor(), pojoFlightDataInMemory);
                          ackStream.onCompleted();
                      };
                  }
      
                  @Override
                  public void doAction(CallContext context, Action action, StreamListener<Result> listener) {
                      FlightDescriptor flightDescriptor = FlightDescriptor.path(new String(action.getBody(), StandardCharsets.UTF_8)); // For recover data for key configured
                      if(dataInMemory.containsKey(flightDescriptor)) {
                          switch (action.getType()) {
                              case "DELETE":
                                  dataInMemory.remove(flightDescriptor);
                                  Result result = new Result("Delete completed".getBytes(StandardCharsets.UTF_8));
                                  listener.onNext(result);
                          }
                          listener.onCompleted();
                      }
                  }
      
                  @Override
                  public FlightInfo getFlightInfo(CallContext context, FlightDescriptor descriptor) {
                      if(!dataInMemory.containsKey(descriptor)){
                          throw new IllegalStateException("Unknown descriptor.");
                      }
                      return new FlightInfo(
                              dataInMemory.get(descriptor).getSchema(),
                              descriptor,
                              Collections.singletonList(new FlightEndpoint(new Ticket(descriptor.getPath().get(0).getBytes(StandardCharsets.UTF_8)), location)), // Configure a key to map back and forward your data using Ticket argument
                              allocator.getAllocatedMemory(),
                              dataInMemory.get(descriptor).getRows()
                      );
                  }
      
                  @Override
                  public void listFlights(CallContext context, Criteria criteria, StreamListener<FlightInfo> listener) {
                      dataInMemory.forEach((k, v) -> {
                          FlightInfo flightInfo = getFlightInfo(null, k);
                          listener.onNext(flightInfo);
                          }
                      );
                      listener.onCompleted();
                  }
              }).build();
              try {
                  flightServer.start();
              } catch (IOException e) {
                  e.printStackTrace();
              }
          }
      
          // Client
          try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)){
              // Populate data
              FlightClient flightClient = FlightClient.builder(allocator, location).build();
              Schema schema = new Schema(Arrays.asList( new Field("name", FieldType.nullable(new ArrowType.Utf8()), null)));
              VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, allocator);
              VarCharVector varCharVector = (VarCharVector) vectorSchemaRoot.getVector("name");
              varCharVector.allocateNew(3);
              varCharVector.set(0, "Ronald".getBytes());
              varCharVector.set(1, "David".getBytes());
              varCharVector.set(2, "Francisco".getBytes());
              varCharVector.setValueCount(3);
              vectorSchemaRoot.setRowCount(3);
              FlightClient.ClientStreamListener listener = flightClient.startPut(FlightDescriptor.path("profiles"), vectorSchemaRoot, new AsyncPutListener());
              listener.putNext();
              vectorSchemaRoot.allocateNew();
              varCharVector.set(0, "Manuel".getBytes());
              varCharVector.set(1, "Felipe".getBytes());
              varCharVector.set(2, "JJ".getBytes());
              varCharVector.setValueCount(3);
              vectorSchemaRoot.setRowCount(3);
              listener.putNext();
              vectorSchemaRoot.clear();
              listener.completed();
              listener.getResult();
      
              // Do delete action
              Iterator<Result> deleteActionResult = flightClient.doAction(new Action("DELETE", FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8) ));
              while(deleteActionResult.hasNext()){
                  Result result = deleteActionResult.next();
                  System.out.println("Do Delete Action: " + new String(result.getBody(), StandardCharsets.UTF_8));
              }
      
              // Get all metadata information
              Iterable<FlightInfo> flightInfos = flightClient.listFlights(Criteria.ALL);
              flightInfos.forEach(t -> System.out.println(t));
              System.out.println("List Flights Info (after delete): No records");
          }
      
      .. testoutput::
      
          Do Delete Action: Delete completed
          List Flights Info (after delete): No records 

       

      How it could be:

      Only offer to the user the main code but running behind scene all the code needed

      // Server
      @Override
      public void doAction(CallContext context, Action action, StreamListener<Result> listener) {
          FlightDescriptor flightDescriptor = FlightDescriptor.path(new String(action.getBody(), StandardCharsets.UTF_8)); // For recover data for key configured
          if(dataInMemory.containsKey(flightDescriptor)) {
              switch (action.getType()) {
                  case "DELETE":
                      dataInMemory.remove(flightDescriptor);
                      Result result = new Result("Delete completed".getBytes(StandardCharsets.UTF_8));
                      listener.onNext(result);
              }
              listener.onCompleted();
          }
      }
      
      // Client
      // Do delete action
      Iterator<Result> deleteActionResult = flightClient.doAction(new Action("DELETE", FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8) ));
      while(deleteActionResult.hasNext()){
          Result result = deleteActionResult.next();
          System.out.println("Do Delete Action: " + new String(result.getBody(), StandardCharsets.UTF_8));
      }
      
      // Get all metadata information
      Iterable<FlightInfo> flightInfos = flightClient.listFlights(Criteria.ALL);
      flightInfos.forEach(t -> System.out.println(t));
      System.out.println("List Flights Info (after delete): No records");
       

      Attachments

        Activity

          People

            dsusanibara David Dali Susanibar Arce
            dsusanibara David Dali Susanibar Arce
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: