Details
-
Sub-task
-
Status: Closed
-
Minor
-
Resolution: Later
-
None
-
None
-
None
-
None
Description
We are running java cookbook code thru sphinx using our custom extension https://github.com/apache/arrow-cookbook/blob/main/java/ext/javadoctest.py
We need to create another extension to only show our end user the java code that is needed to showcase but running the whole java code cookbook at testing part.
Current documentation:
Validate Delete Data ******************** And confirm that it's been deleted: .. testcode:: import org.apache.arrow.flight.Action; import org.apache.arrow.flight.AsyncPutListener; import org.apache.arrow.flight.Criteria; import org.apache.arrow.flight.FlightClient; import org.apache.arrow.flight.FlightDescriptor; import org.apache.arrow.flight.FlightEndpoint; import org.apache.arrow.flight.FlightInfo; import org.apache.arrow.flight.FlightServer; import org.apache.arrow.flight.FlightStream; import org.apache.arrow.flight.Location; import org.apache.arrow.flight.NoOpFlightProducer; import org.apache.arrow.flight.PutResult; import org.apache.arrow.flight.Result; import org.apache.arrow.flight.Ticket; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.VectorUnloader; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.FieldType; import org.apache.arrow.vector.types.pojo.Schema; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; class DataInMemory { private List<ArrowRecordBatch> listArrowRecordBatch; private Schema schema; private Long rows; public DataInMemory(List<ArrowRecordBatch> listArrowRecordBatch, Schema schema, Long rows) { this.listArrowRecordBatch = listArrowRecordBatch; this.schema = schema; this.rows = rows; } public List<ArrowRecordBatch> getListArrowRecordBatch() { return listArrowRecordBatch; } public Schema getSchema() { return schema; } public Long getRows() { return rows; } } // Server Location location = Location.forGrpcInsecure("0.0.0.0", 33333); Map<FlightDescriptor, DataInMemory> dataInMemory = new HashMap<>(); Map<String, DataInMemory> mapPojoFlightDataInMemory = new HashMap<>(); List<ArrowRecordBatch> listArrowRecordBatch = new ArrayList<>(); try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)){ FlightServer flightServer = FlightServer.builder(allocator, location, new NoOpFlightProducer(){ @Override public Runnable acceptPut(CallContext context, FlightStream flightStream, StreamListener<PutResult> ackStream) { return () -> { long rows = 0; while (flightStream.next()) { VectorUnloader unloader = new VectorUnloader(flightStream.getRoot()); try (final ArrowRecordBatch arb = unloader.getRecordBatch()) { // Retain data information listArrowRecordBatch.add(arb); rows = rows + flightStream.getRoot().getRowCount(); } } long finalRows = rows; DataInMemory pojoFlightDataInMemory = new DataInMemory(listArrowRecordBatch, flightStream.getSchema(), finalRows); dataInMemory.put(flightStream.getDescriptor(), pojoFlightDataInMemory); ackStream.onCompleted(); }; } @Override public void doAction(CallContext context, Action action, StreamListener<Result> listener) { FlightDescriptor flightDescriptor = FlightDescriptor.path(new String(action.getBody(), StandardCharsets.UTF_8)); // For recover data for key configured if(dataInMemory.containsKey(flightDescriptor)) { switch (action.getType()) { case "DELETE": dataInMemory.remove(flightDescriptor); Result result = new Result("Delete completed".getBytes(StandardCharsets.UTF_8)); listener.onNext(result); } listener.onCompleted(); } } @Override public FlightInfo getFlightInfo(CallContext context, FlightDescriptor descriptor) { if(!dataInMemory.containsKey(descriptor)){ throw new IllegalStateException("Unknown descriptor."); } return new FlightInfo( dataInMemory.get(descriptor).getSchema(), descriptor, Collections.singletonList(new FlightEndpoint(new Ticket(descriptor.getPath().get(0).getBytes(StandardCharsets.UTF_8)), location)), // Configure a key to map back and forward your data using Ticket argument allocator.getAllocatedMemory(), dataInMemory.get(descriptor).getRows() ); } @Override public void listFlights(CallContext context, Criteria criteria, StreamListener<FlightInfo> listener) { dataInMemory.forEach((k, v) -> { FlightInfo flightInfo = getFlightInfo(null, k); listener.onNext(flightInfo); } ); listener.onCompleted(); } }).build(); try { flightServer.start(); } catch (IOException e) { e.printStackTrace(); } } // Client try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)){ // Populate data FlightClient flightClient = FlightClient.builder(allocator, location).build(); Schema schema = new Schema(Arrays.asList( new Field("name", FieldType.nullable(new ArrowType.Utf8()), null))); VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, allocator); VarCharVector varCharVector = (VarCharVector) vectorSchemaRoot.getVector("name"); varCharVector.allocateNew(3); varCharVector.set(0, "Ronald".getBytes()); varCharVector.set(1, "David".getBytes()); varCharVector.set(2, "Francisco".getBytes()); varCharVector.setValueCount(3); vectorSchemaRoot.setRowCount(3); FlightClient.ClientStreamListener listener = flightClient.startPut(FlightDescriptor.path("profiles"), vectorSchemaRoot, new AsyncPutListener()); listener.putNext(); vectorSchemaRoot.allocateNew(); varCharVector.set(0, "Manuel".getBytes()); varCharVector.set(1, "Felipe".getBytes()); varCharVector.set(2, "JJ".getBytes()); varCharVector.setValueCount(3); vectorSchemaRoot.setRowCount(3); listener.putNext(); vectorSchemaRoot.clear(); listener.completed(); listener.getResult(); // Do delete action Iterator<Result> deleteActionResult = flightClient.doAction(new Action("DELETE", FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8) )); while(deleteActionResult.hasNext()){ Result result = deleteActionResult.next(); System.out.println("Do Delete Action: " + new String(result.getBody(), StandardCharsets.UTF_8)); } // Get all metadata information Iterable<FlightInfo> flightInfos = flightClient.listFlights(Criteria.ALL); flightInfos.forEach(t -> System.out.println(t)); System.out.println("List Flights Info (after delete): No records"); } .. testoutput:: Do Delete Action: Delete completed List Flights Info (after delete): No records
How it could be:
Only offer to the user the main code but running behind scene all the code needed
// Server @Override public void doAction(CallContext context, Action action, StreamListener<Result> listener) { FlightDescriptor flightDescriptor = FlightDescriptor.path(new String(action.getBody(), StandardCharsets.UTF_8)); // For recover data for key configured if(dataInMemory.containsKey(flightDescriptor)) { switch (action.getType()) { case "DELETE": dataInMemory.remove(flightDescriptor); Result result = new Result("Delete completed".getBytes(StandardCharsets.UTF_8)); listener.onNext(result); } listener.onCompleted(); } } // Client // Do delete action Iterator<Result> deleteActionResult = flightClient.doAction(new Action("DELETE", FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8) )); while(deleteActionResult.hasNext()){ Result result = deleteActionResult.next(); System.out.println("Do Delete Action: " + new String(result.getBody(), StandardCharsets.UTF_8)); } // Get all metadata information Iterable<FlightInfo> flightInfos = flightClient.listFlights(Criteria.ALL); flightInfos.forEach(t -> System.out.println(t)); System.out.println("List Flights Info (after delete): No records");