Description
@Override protected <X> void pushToOperator(StreamRecord<X> record) { try { // we know that the given outputTag matches our OutputTag so the record // must be of the type that our operator (and Serializer) expects. @SuppressWarnings("unchecked") StreamRecord<T> castRecord = (StreamRecord<T>) record; numRecordsIn.inc(); StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue())); operator.setKeyContextElement1(copy); operator.processElement(copy); } catch (ClassCastException e) { // Enrich error message ClassCastException replace = new ClassCastException( String.format( "%s. Failed to push OutputTag with id '%s' to operator. " + "This can occur when multiple OutputTags with different types " + "but identical names are being used.", e.getMessage(), outputTag.getId())); throw new ExceptionInChainedOperatorException(replace); } catch (Exception e) { throw new ExceptionInChainedOperatorException(e); } }
If outputTag is null (as is the case when no sideOutput was defined) the catch block will crash with a NullPointerException. This may happen if operator.processElement throws a ClassCastException.
Attachments
Issue Links
- is duplicated by
-
FLINK-8616 Missing null check in OperatorChain.CopyingChainingOutput#pushToOperator masks ClassCastException
- Closed
-
FLINK-9252 OperatorChain.pushToOperators NullPointerException when ClassCastException
- Closed
- links to