Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-24885

ProcessElement Interface parameter Collector : java.lang.NullPointerException

    XMLWordPrintableJSON

Details

    Description

      2021-11-15 11:11:55,032 INFO com.asap.demo.function.dealMapFunction [] - size:160
      2021-11-15 11:11:55,230 WARN org.apache.flink.runtime.taskmanager.Task [] - Co-Process-Broadcast-Keyed -> Map -> DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_8, type=com.asap.demo.model.BeanField<`account` STRING, `accountId` STRING, `accountIn` STRING, `accountInName` STRING, `accountInOrgId` STRING, `accountInOrgName` STRING, `accountInType` STRING, `accountName` STRING, `accountOrgId` STRING, `accountOrgName` STRING, `accountOut` STRING, `accountOutName` STRING, `accountOutOrgId` STRING, `accountOutOrgName` STRING, `accountOutType` STRING, `accountStatus` STRING, `accountType` STRING, `action` STRING, `actionDesc` STRING, `alarmcontext` STRING, `alarmgrade` STRING, `alarmtype` STRING, `alertId` STRING, `alertInfo` STRING, `alertLevel` STRING, `alertSignatureIdL` STRING, `appId` STRING, `appName` STRING, `appProtocol` STRING, `appType` STRING, `areaId` STRING, `areaName` STRING, `areaType` STRING, `assetFrom` STRING, `assetId` STRING, `assetInfo` STRING, `assetIp` STRING, `assetLevel` STRING, `assetName` STRING, `assetPid` STRING, `assetType` STRING, `assetUse` STRING, `assetVendor` STRING, `attackStage` STRING, `attackStageCode` STRING, `attackType` STRING, `attackTypeName` STRING, `authSerNum` STRING, `authTime` STRING, `authType` STRING, `bankSeqNum` STRING, `batchNo` STRING, `blackDomain` STRING, `blackDomainDesc` STRING, `blackDomainTag` STRING, `blackDstIp` STRING, `blackFile` STRING, `blackFileDesc` STRING, `blackFileTag` STRING, `blackId` STRING, `blackIpTag` STRING, `blackSrcIp` STRING, `blackTag` STRING, `blackTagMatchCount` STRING, `blackTagMatchDesc` STRING, `blackUrl` STRING, `blackUrlDesc` STRING, `blackUrlTag` STRING, `blackVulnCve` STRING, `blackVulnDesc` STRING, `blackVulnName` STRING, `blackVulnTag` STRING, `branchId` STRING, `branchName` STRING, `businessSystemName` STRING, `businessType` STRING, `cardId` STRING, `cascadeSourceIp` STRING, `cascadeSourceName` STRING, `cebUid` STRING, `certNum` STRING, `certType` STRING, `chainId` STRING, `channel` STRING, `channelId` STRING, `character` STRING, `charge` STRING, `cifSeqNum` STRING, `clientInfo` STRING, `clientIp` STRING, `clientMac` STRING, `clientName` STRING, `clientPort` STRING, `collectTime` TIMESTAMP_LTZ(9), `collectTimeL` TIMESTAMP_LTZ(9), `command` STRING, `commandLine` STRING, `commandResult` STRING, `company` STRING, `companyCustomId` STRING, `companyId` STRING, `completenessTag` STRING, `confidence` STRING, `confidenceLevel` STRING, `consignedUser` STRING, `contractNo` STRING, `count` STRING, `couponAmount` STRING, `couponId` STRING, `createTime` TIMESTAMP_LTZ(3), `createTimeL` BIGINT, `createdBy` STRING, `curType` STRING, `currency` STRING, `currentBal` STRING, `customLabel1` STRING, `customLabel10` STRING, `customLabel2` STRING, `customLabel3` STRING, `customLabel4` STRING, `customLabel5` STRING, `customLabel6` STRING, `customLabel7` STRING, `customLabel8` STRING, `customLabel9` STRING, `customValue1` STRING, `customValue10` STRING, `customValue2` STRING, `customValue3` STRING, `customValue4` STRING, `customValue5` STRING, `customValue6` STRING, `customValue7` STRING, `customValue8` STRING, `customValue9` STRING, `dataQualityTag` STRING, `dataType` STRING, `dataTypeName` STRING, `dbInstance` STRING, `dbName` STRING, `dbTable` STRING, `dbVersion` STRING, `dealSuggest` STRING, `defVManagerId` STRING, `department` STRING, `deviceCategory` STRING, `deviceId` STRING, `deviceIp` STRING, `deviceMac` STRING, `deviceName` STRING, `deviceParentType` STRING, `deviceType` STRING, `deviceVersion` STRING, `direction` STRING, `directionDesc` STRING, `directionOfAttackTag` STRING, `domain` STRING, `dstAdminAccount` STRING, `dstAdminEmail` STRING, `dstAdminFOrgId` STRING, `dstAdminId` STRING, `dstAdminMobile` STRING, `dstAdminName` STRING, `dstAdminOrgId` STRING, `dstAdminOrgName` STRING, `dstAdminType` STRING, `dstAsset` STRING, `dstAssetId` STRING, `dstAssetInfo` STRING, `dstAssetKey` STRING, `dstAssetLevel` STRING, `dstAssetModel` STRING, `dstAssetName` STRING, `dstAssetPid` STRING, `dstAssetStatus` STRING, `dstAssetSubType` STRING, `dstAssetType` STRING, `dstAssetVendor` STRING, `dstBizId` STRING, `dstCity` STRING, `dstCompany` STRING, `dstCountry` STRING, `dstDbInstance` STRING, `dstDomainName` STRING, `dstFGroupId` STRING, `dstGroupId` STRING, `dstGroupName` STRING, `dstHostName` STRING, `dstIndustry` STRING, `dstIntelDesc` STRING, `dstIntelId` STRING, `dstIntelType` STRING, `dstInterface` STRING, `dstIp` STRING, `dstIpL` STRING, `dstLatitude` STRING, `dstLongitude` STRING, `dstMac` STRING, `dstManagerIp` STRING, `dstNatIp` STRING, `dstNatPort` STRING, `dstOperator` STRING, `dstOrgAdmin` STRING, `dstOrgId` STRING, `dstOrgName` STRING, `dstOsId` STRING, `dstPort` STRING, `dstPost` STRING, `dstProvince` STRING, `dstService` STRING, `dstSubDomainName` STRING, `dstUser` STRING, `dstZone` STRING, `duration` STRING, `empNum` STRING, `endTime` TIMESTAMP_LTZ(9), `engineName` STRING, `entryTime` TIMESTAMP_LTZ(9), `errorCode` STRING, `eventAppendix` STRING, `eventCount` STRING, `eventId` STRING, `eventIp` STRING, `eventName` STRING, `eventOneType` STRING, `eventOneTypeDesc` STRING, `eventOneTypeName` STRING, `eventParentType` STRING, `eventThreeType` STRING, `eventThreeTypeDesc` STRING, `eventThreeTypeName` STRING, `eventTwoType` STRING, `eventTwoTypeDesc` STRING, `eventTwoTypeName` STRING, `eventType` STRING, `fileHash` STRING, `fileName` STRING, `filePath` STRING, `fileSize` STRING, `fileType` STRING, `flag` STRING, `flow` STRING, `flowAvg` STRING, `flowDiscard` STRING, `flowDown` STRING, `flowMax` STRING, `flowNum` STRING, `flowUp` STRING, `groupId` STRING, `groupName` STRING, `id` STRING, `idCard` STRING, `indexTag` STRING, `infectionDstIp` STRING, `infectionDstName` STRING, `infectionFile` STRING, `infectionIp` STRING, `infectionSrcIp` STRING, `infectionSrcName` STRING, `installNum` STRING, `instance` STRING, `interestedIp` STRING, `intranetInternetTag` STRING, `ipType` STRING, `isBack` STRING, `jobTitle` STRING, `languageSign` STRING, `lastLoginTime` TIMESTAMP_LTZ(9), `lastUpdBy` STRING, `lastUpdTime` TIMESTAMP_LTZ(9), `latnId` STRING, `length` STRING, `location` STRING, `lockDesc` STRING, `lockTime` TIMESTAMP_LTZ(9), `logStatus` STRING, `logSubType` STRING, `logType` STRING, `loginTime` TIMESTAMP_LTZ(9), `loginType` STRING, `loginWay` STRING, `mailAdd` STRING, `mailIn` STRING, `mailOut` STRING, `mailRecipient` STRING, `mailSender` STRING, `mailSubject` STRING, `mailTotal` STRING, `mailType` STRING, `mainAccount` STRING, `mainAccountCreateTime` TIMESTAMP_LTZ(9), `mainAccountCreateUser` STRING, `mainAccountDesc` STRING, `mainAccountId` STRING, `mainAccountInvalidTime` TIMESTAMP_LTZ(9), `mainAccountLoginDateLast` STRING, `mainAccountLoginFailCount` STRING, `mainAccountModifyPwdTime` TIMESTAMP_LTZ(9), `mainAccountModifyTime` TIMESTAMP_LTZ(9), `mainAccountStatus` STRING, `mainAccountType` STRING, `mainAccountValidTime` TIMESTAMP_LTZ(9), `malwareName` STRING, `malwareSubType` STRING, `malwareType` STRING, `managerId` STRING, `managerIp` STRING, `managerTypeId` STRING, `menuDesc` STRING, `menuId` STRING, `menuName` STRING, `menuPid` STRING, `menuStatus` STRING, `menuType` STRING, `merchantId` STRING, `merchantName` STRING, `message` STRING, `method` STRING, `missingField` STRING, `model` STRING, `module` STRING, `moduleId` STRING, `name` STRING, `networkType` STRING, `newValue` STRING, `nextFlowNum` STRING, `object` STRING, `oldIdCard` STRING, `oldValue` STRING, `openid` STRING, `operType` STRING, `operTypeName` STRING, `ordSerNum` STRING, `order` STRING, `orderNo` STRING, `orderType` STRING, `orgCode` STRING, `orgId` STRING, `orgName` STRING, `orgNameLevel` STRING, `orgNamePath` STRING, `osId` STRING, `osName` STRING, `osVersion` STRING, `parentGroupId` STRING, `parentOrgId` STRING, `parentOrgName` STRING, `parentOrgNamePath` STRING, `passUpdateTime` STRING, `password` STRING, `payItemId` STRING, `payItemName` STRING, `payTime` TIMESTAMP_LTZ(9), `payUnitName` STRING, `payUnitType` STRING, `personId` STRING, `personName` STRING, `phone` STRING, `phoneImer` STRING, `phs` STRING, `policyId` STRING, `policyInfo` STRING, `policyName` STRING, `position` STRING, `priority` STRING, `profession` STRING, `professionName` STRING, `protocol` STRING, `provinceFromId` STRING, `provinceFromName` STRING, `rate` STRING, `rawMsg` STRING, `realFee` STRING, `reason` STRING, `receFee` STRING, `recvPacket` STRING, `recvSize` STRING, `refundOrderNo` STRING, `refundOrderTime` TIMESTAMP_LTZ(9), `registerNum` STRING, `registerRate` STRING, `rejCode` STRING, `relateAccount` STRING, `relateAccountId` STRING, `relateAccountName` STRING, `remark` STRING, `requestMessage` STRING, `requestNo` STRING, `requestTime` TIMESTAMP_LTZ(9), `responseCode` STRING, `responseIp` STRING, `responseMessage` STRING, `result` STRING, `resultCode` STRING, `resultDesc` STRING, `retain` STRING, `riskLevel` STRING, `riskLevelDesc` STRING, `roleId` STRING, `roleName` STRING, `ruleId` STRING, `ruleName` STRING, `ruleTjCount` STRING, `safetyMargin` STRING, `sceneId` STRING, `sceneOneType` STRING, `sceneThreeType` STRING, `sceneTwoType` STRING, `sendPacket` STRING, `sendSize` STRING, `serialNum` STRING, `serverIp` STRING, `serverName` STRING, `serverPort` STRING, `service` STRING, `serviceTime` TIMESTAMP_LTZ(9), `sessionCount` BIGINT, `sessionId` STRING, `settleMethod` STRING, `sex` STRING, `shareFlag` STRING, `sid` STRING, `signData` STRING, `snowId` STRING, `softwareInfo` STRING, `source` STRING, `srcAdminAccount` STRING, `srcAdminEmail` STRING, `srcAdminFOrgId` STRING, `srcAdminId` STRING, `srcAdminMobile` STRING, `srcAdminName` STRING, `srcAdminOrgId` STRING, `srcAdminOrgName` STRING, `srcAdminType` STRING, `srcAsset` STRING, `srcAssetId` STRING, `srcAssetInfo` STRING, `srcAssetKey` STRING, `srcAssetLevel` STRING, `srcAssetModel` STRING, `srcAssetName` STRING, `srcAssetPid` STRING, `srcAssetStatus` STRING, `srcAssetSubType` STRING, `srcAssetType` STRING, `srcAssetVendor` STRING, `srcBizId` STRING, `srcCity` STRING, `srcCompany` STRING, `srcContnent` STRING, `srcCountry` STRING, `srcDbInstance` STRING, `srcDomainName` STRING, `srcFGroupId` STRING, `srcGroupId` STRING, `srcGroupName` STRING, `srcHostName` STRING, `srcIndustry` STRING, `srcIntelDesc` STRING, `srcIntelId` STRING, `srcIntelType` STRING, `srcInterface` STRING, `srcIp` STRING, `srcIpL` STRING, `srcLatitude` STRING, `srcLongitude` STRING, `srcMac` STRING, `srcManagerIp` STRING, `srcNatIp` STRING, `srcNatPort` STRING, `srcOperator` STRING, `srcOrgAdmin` STRING, `srcOrgId` STRING, `srcOrgName` STRING, `srcOsId` STRING, `srcPort` STRING, `srcPost` STRING, `srcProvince` STRING, `srcService` STRING, `srcSubDomainName` STRING, `srcUser` STRING, `srcZone` STRING, `staffCode` STRING, `staffCrm` STRING, `staffName` STRING, `staffState` STRING, `startTime` TIMESTAMP_LTZ(9), `status` STRING, `subAccount` STRING, `subAccountCreateTime` TIMESTAMP_LTZ(9), `subAccountCreateUser` STRING, `subAccountDesc` STRING, `subAccountId` STRING, `subAccountInvalidTime` TIMESTAMP_LTZ(9), `subAccountLoginDateLast` STRING, `subAccountLoginFailCount` STRING, `subAccountModifyPwdTime` TIMESTAMP_LTZ(9), `subAccountModifyTime` TIMESTAMP_LTZ(9), `subAccountStatus` STRING, `subAccountType` STRING, `subAccountValidTime` TIMESTAMP_LTZ(9), `sumAreaId` STRING, `sumManagerId` STRING, `tag` STRING, `taskId` STRING, `taskName` STRING, `telephone` STRING, `telephoneType` STRING, `tenantId` STRING, `tenantName` STRING, `terminalNum` STRING, `threatName` STRING, `threatType` STRING, `threatTypeDesc` STRING, `transBal` STRING, `transChannel` STRING, `transCode` STRING, `transId` STRING, `transName` STRING, `transStatus` STRING, `transTime` TIMESTAMP_LTZ(9), `transType` STRING, `type` STRING, `unitName` STRING, `updateTime` TIMESTAMP_LTZ(9), `upmpQn` STRING, `upmpSerialNum` STRING, `url` STRING, `user` STRING, `userGroupId` STRING, `userGroupName` STRING, `userId` STRING, `userOrgId` STRING, `userOrgName` STRING, `userType` STRING, `uuId` STRING, `value` STRING, `version` STRING, `voidOrderNo` STRING, `vulnId` STRING, `vulnInfo` STRING, `vulnLevel` STRING, `vulnName` STRING, `vulnType` STRING, `weixinId` STRING, `weixinVersion` STRING, `wpTag` STRING, `writeOffTime` TIMESTAMP_LTZ(9)>, rowtime=false, watermark=true) -> Calc(select=[eventTwoType, deviceParentType, type, eventName, directionDesc, srcIp, dstIp, createTime, snowId]) -> SinkConversionToTuple2 -> Sink: Print to Std. Out (1/1)#0 (851b2092ae4f274d5c7be1f2ea7acaba) switched from RUNNING to FAILED with failure cause: java.lang.NullPointerException
      at SinkConversion$22.processElement(Unknown Source)
      at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
      at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
      at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
      at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
      at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
      at StreamExecCalc$18.processElement(Unknown Source)
      at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
      at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
      at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
      at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
      at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
      at org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:128)
      at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
      at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
      at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
      at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
      at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
      at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
      at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
      at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
      at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
      at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
      at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
      at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
      at com.asap.demo.function.dealStreamProcessFunction.match(dealStreamProcessFunction.java:131)
      at com.asap.demo.function.dealStreamProcessFunction.processElement(dealStreamProcessFunction.java:115)
      at com.asap.demo.function.dealStreamProcessFunction.processElement(dealStreamProcessFunction.java:33)
      at org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator.processElement1(CoBroadcastWithKeyedOperator.java:125)
      at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:213)
      at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:178)
      at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:291)
      at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
      at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
      at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
      at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:96)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
      at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
      at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
      at java.lang.Thread.run(Thread.java:748)

      Attachments

        1. error.jpg
          354 kB
          wangbaohua

        Issue Links

          Activity

            People

              Unassigned Unassigned
              wangbaohua wangbaohua
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: