diff --git a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java index b6c8890..0866850 100644 --- a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java @@ -64,48 +64,48 @@ private static final String DEFAULT_LINE_DELIMITER_PATTERN = "[\r\n]"; protected HiveConf conf; - private StreamingConnection conn; + protected StreamingConnection conn; protected Table table; - List inputColumns; - List inputTypes; - private String fullyQualifiedTableName; - private Map> updaters = new HashMap<>(); - private Map partitionPaths = new HashMap<>(); - private Set addedPartitions = new HashSet<>(); + protected List inputColumns; + protected List inputTypes; + protected String fullyQualifiedTableName; + protected Map> updaters = new HashMap<>(); + protected Map partitionPaths = new HashMap<>(); + protected Set addedPartitions = new HashSet<>(); // input OI includes table columns + partition columns - private StructObjectInspector inputRowObjectInspector; + protected StructObjectInspector inputRowObjectInspector; // output OI strips off the partition columns and retains other columns - private ObjectInspector outputRowObjectInspector; - private List partitionColumns = new ArrayList<>(); - private ObjectInspector[] partitionObjInspectors = null; - private StructField[] partitionStructFields = null; - private Object[] partitionFieldData; - private ObjectInspector[] bucketObjInspectors = null; - private StructField[] bucketStructFields = null; - private Object[] bucketFieldData; - private List bucketIds = new ArrayList<>(); - private int totalBuckets; - private String defaultPartitionName; - private boolean isBucketed; - private AcidOutputFormat acidOutputFormat; - private Long curBatchMinWriteId; - private Long curBatchMaxWriteId; - private final String lineDelimiter; - private HeapMemoryMonitor heapMemoryMonitor; + protected ObjectInspector outputRowObjectInspector; + protected List partitionColumns = new ArrayList<>(); + protected ObjectInspector[] partitionObjInspectors = null; + protected StructField[] partitionStructFields = null; + protected Object[] partitionFieldData; + protected ObjectInspector[] bucketObjInspectors = null; + protected StructField[] bucketStructFields = null; + protected Object[] bucketFieldData; + protected List bucketIds = new ArrayList<>(); + protected int totalBuckets; + protected String defaultPartitionName; + protected boolean isBucketed; + protected AcidOutputFormat acidOutputFormat; + protected Long curBatchMinWriteId; + protected Long curBatchMaxWriteId; + protected final String lineDelimiter; + protected HeapMemoryMonitor heapMemoryMonitor; // if low memory canary is set and if records after set canary exceeds threshold, trigger a flush. // This is to avoid getting notified of low memory too often and flushing too often. - private AtomicBoolean lowMemoryCanary; - private long ingestSizeBytes = 0; - private boolean autoFlush; - private float memoryUsageThreshold; - private long ingestSizeThreshold; + protected AtomicBoolean lowMemoryCanary; + protected long ingestSizeBytes = 0; + protected boolean autoFlush; + protected float memoryUsageThreshold; + protected long ingestSizeThreshold; public AbstractRecordWriter(final String lineDelimiter) { this.lineDelimiter = lineDelimiter == null || lineDelimiter.isEmpty() ? DEFAULT_LINE_DELIMITER_PATTERN : lineDelimiter; } - private static class OrcMemoryPressureMonitor implements HeapMemoryMonitor.Listener { + protected static class OrcMemoryPressureMonitor implements HeapMemoryMonitor.Listener { private static final Logger LOG = LoggerFactory.getLogger(OrcMemoryPressureMonitor.class.getName()); private final AtomicBoolean lowMemoryCanary; @@ -179,7 +179,7 @@ public void init(StreamingConnection conn, long minWriteId, long maxWriteId) thr } } - private void setupMemoryMonitoring() { + protected void setupMemoryMonitoring() { this.autoFlush = conf.getBoolVar(HiveConf.ConfVars.HIVE_STREAMING_AUTO_FLUSH_ENABLED); this.memoryUsageThreshold = conf.getFloatVar(HiveConf.ConfVars.HIVE_HEAP_MEMORY_MONITOR_USAGE_THRESHOLD); this.ingestSizeThreshold = conf.getSizeVar(HiveConf.ConfVars.HIVE_STREAMING_AUTO_FLUSH_CHECK_INTERVAL_SIZE); @@ -201,7 +201,7 @@ private void setupMemoryMonitoring() { } } - private void prepareBucketingFields() { + protected void prepareBucketingFields() { this.isBucketed = table.getSd().getNumBuckets() > 0; // For unbucketed tables we have exactly 1 RecordUpdater (until HIVE-19208) for each AbstractRecordWriter which // ends up writing to a file bucket_000000. @@ -219,7 +219,7 @@ private void prepareBucketingFields() { } } - private void preparePartitioningFields() { + protected void preparePartitioningFields() { final int numPartitions = table.getPartitionKeys().size(); this.partitionFieldData = new Object[numPartitions]; this.partitionObjInspectors = new ObjectInspector[numPartitions]; @@ -240,12 +240,12 @@ private void preparePartitioningFields() { /** * used to tag error msgs to provided some breadcrumbs */ - private String getWatermark(String partition) { + protected String getWatermark(String partition) { return partition + " writeIds[" + curBatchMinWriteId + "," + curBatchMaxWriteId + "]"; } // return the column numbers of the bucketed columns - private List getBucketColIDs(List bucketCols, List cols) { + protected List getBucketColIDs(List bucketCols, List cols) { ArrayList result = new ArrayList<>(bucketCols.size()); HashSet bucketSet = new HashSet<>(bucketCols); for (int i = 0; i < cols.size(); i++) { @@ -275,7 +275,7 @@ private String getWatermark(String partition) { public abstract Object encode(byte[] record) throws SerializationError; // returns the bucket number to which the record belongs to - private int getBucket(Object row) { + protected int getBucket(Object row) { if (!isBucketed) { return 0; } @@ -288,7 +288,7 @@ private int getBucket(Object row) { ObjectInspectorUtils.getBucketNumberOld(bucketFields, bucketObjInspectors, totalBuckets); } - private List getPartitionValues(final Object row) { + protected List getPartitionValues(final Object row) { if (!conn.isPartitionedTable()) { return null; } @@ -359,7 +359,7 @@ public void close() throws StreamingIOFailure { } } - private static ObjectInspector[] getObjectInspectorsForBucketedCols(List bucketIds + protected static ObjectInspector[] getObjectInspectorsForBucketedCols(List bucketIds , StructObjectInspector recordObjInspector) { ObjectInspector[] result = new ObjectInspector[bucketIds.size()]; @@ -371,14 +371,14 @@ public void close() throws StreamingIOFailure { return result; } - private Object[] getBucketFields(Object row) { + protected Object[] getBucketFields(Object row) { for (int i = 0; i < bucketIds.size(); i++) { bucketFieldData[i] = inputRowObjectInspector.getStructFieldData(row, bucketStructFields[i]); } return bucketFieldData; } - private Object[] getPartitionFields(Object row) { + protected Object[] getPartitionFields(Object row) { for (int i = 0; i < partitionFieldData.length; i++) { partitionFieldData[i] = inputRowObjectInspector.getStructFieldData(row, partitionStructFields[i]); } @@ -412,7 +412,7 @@ public void write(final long writeId, final byte[] record) throws StreamingExcep } } - private void checkAutoFlush() throws StreamingIOFailure { + protected void checkAutoFlush() throws StreamingIOFailure { if (!autoFlush) { return; } @@ -444,7 +444,7 @@ private void checkAutoFlush() throws StreamingIOFailure { return addedPartitions; } - private RecordUpdater createRecordUpdater(final Path partitionPath, int bucketId, Long minWriteId, + protected RecordUpdater createRecordUpdater(final Path partitionPath, int bucketId, Long minWriteId, Long maxWriteID) throws IOException { // Initialize table properties from the table parameters. This is required because the table @@ -463,7 +463,7 @@ private RecordUpdater createRecordUpdater(final Path partitionPath, int bucketId .finalDestination(partitionPath)); } - private RecordUpdater getRecordUpdater(List partitionValues, int bucketId) throws StreamingIOFailure { + protected RecordUpdater getRecordUpdater(List partitionValues, int bucketId) throws StreamingIOFailure { RecordUpdater recordUpdater; String key; Path destLocation; @@ -510,7 +510,7 @@ private RecordUpdater getRecordUpdater(List partitionValues, int bucketI return recordUpdater; } - private List initializeBuckets() { + protected List initializeBuckets() { List result = new ArrayList<>(totalBuckets); for (int bucket = 0; bucket < totalBuckets; bucket++) { result.add(bucket, null); //so that get(i) returns null rather than ArrayOutOfBounds @@ -518,7 +518,7 @@ private RecordUpdater getRecordUpdater(List partitionValues, int bucketI return result; } - private void logStats(final String prefix) { + protected void logStats(final String prefix) { int openRecordUpdaters = updaters.values() .stream() .mapToInt(List::size)