Details
-
New Feature
-
Status: Open
-
P3
-
Resolution: Unresolved
-
2.12.0
-
None
-
None
-
Dataflow
Description
While Developing my code I used the below code snippet to read the table data from BigQuery.
PCollection<ReasonCode> gpseEftReasonCodes = input .apply("Reading xxyyzz", BigQueryIO .read(new ReadTable<ReasonCode>(ReasonCode.class)) .withoutValidation() .withTemplateCompatibility() .fromQuery("Select * from dataset.xxyyzz") .usingStandardSql() .withCoder(SerializableCoder.of(xxyyzz.class))
Read Table Class:
@DefaultSchema(JavaBeanSchema.class) public class ReadTable<T> implements SerializableFunction<SchemaAndRecord, T> { private static final long serialVersionUID = 1L; private static Gson gson = new Gson(); public static final Logger LOG = LoggerFactory.getLogger(ReadTable.class); private final Counter countingRecords = Metrics.counter(ReadTable.class, "Reading Records EFT Report"); private Class<T> class1; public ReadTable(Class<T> class1) { this.class1 = class1; } public T apply(SchemaAndRecord schemaAndRecord) { Map<String, String> mapping = new HashMap<>(); int counter = 0; try { GenericRecord s = schemaAndRecord.getRecord(); org.apache.avro.Schema s1 = s.getSchema(); for (Field f : s1.getFields()) { counter++; mapping.put(f.name(), null==s.get(f.name()) ? null : String.valueOf(s.get(counter))); } countingRecords.inc(); JsonElement jsonElement = gson.toJsonTree(mapping); return gson.fromJson(jsonElement, class1); } catch (Exception mp) { LOG.error("Found Wrong Mapping for the Record: "+mapping); mp.printStackTrace(); return null; } } }
So After Reading the data from Bigquery I was mapping data from SchemaAndRecord to pojo I was getting value for columns whose Data type is Numeric mention below.
last_update_amount=java.nio.HeapByteBuffer[pos=0 lim=16 cap=16]
My Expectation was I will get exact value but getting the HyperByte Buffer the version I am using is Apache beam 2.12.0. If any more information is needed then please let me know.
Way 2 Tried:
GenericRecord s = schemaAndRecord.getRecord(); org.apache.avro.Schema s1 = s.getSchema(); for (Field f : s1.getFields()) { counter++; mapping.put(f.name(), null==s.get(f.name()) ? null : String.valueOf(s.get(counter))); if(f.name().equalsIgnoreCase("reason_code_id")) { BigDecimal numericValue = new Conversions.DecimalConversion() .fromBytes((ByteBuffer) s.get(f.name()), Schema.create(s1.getType()), s1.getLogicalType()); System.out.println("Numeric Con"+numericValue); } else { System.out.println("Else Condition "+f.name()); }
Facing Issue:
2019-05-24 (14:10:37) org.apache.avro.AvroRuntimeException: Can't create a: RECORD
It would be Great if we have a method which maps all the BigQuery Data with Pojo Schema which Means if I have 10 Columns in BQ and in my Pojo I need only 5 Column then, in that case, BigQueryIO should map only that 5 Data values into Java Class and Rest will be Rejected As I am Doing After So much Effort.
Numeric Data Type must be Deserialize by itself while fetching data like TableRow.