Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-47156

SparkSession returns a null context during a dataset creation

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Not A Bug
    • 3.4.2
    • None
    • Spark Core
    • None
    • Debian 12

      Java 17

    Description

      I need first to know if I'm in front of a bug or not.

      If it's the case, I'll manage to create a test to help you reproduce the case, but if it isn't, maybe Spark documentation could explain when sparkSession.getContext() can return null.

       

      I'm willing to ease my development by separating :

      • parquet files management { checking existence, then loading them as cache, or saving data to them },
      • from dataset creation, when it doesn't exist yet, and should be constituted from scratch.

       

      The method I'm using is this one:

      {code:Java}
      protected Dataset<Row> constitutionStandard(OptionsCreationLecture optionsCreationLecture,
         Supplier<Dataset<Row>> worker, CacheParqueteur<? extends TriParqueteurIF> cacheParqueteur) {
         OptionsCreationLecture options = optionsCreationLecture != null ? optionsCreationLecture : optionsCreationLecture();
      
         Dataset<Row> dataset = cacheParqueteur.call(options.useCache());
         return dataset == null ? cacheParqueteur.save(cacheParqueteur.appliquer(worker.get())) : dataset;
      }
      

      In case the dataset doesn't exist in parquet files (= cache) yet, it starts its creation by calling a worker.get() that is a Supplier of Dataset<Row>.

       

      A concrete usage is this one:

      {code:Java}
      public Dataset<Row> rowEtablissements(OptionsCreationLecture optionsCreationLecture, HistoriqueExecution historiqueExecution, int anneeCOG, int anneeSIRENE, boolean actifsSeulement, boolean communesValides, boolean nomenclaturesNAF2Valides) {
         OptionsCreationLecture options = optionsCreationLecture != null ? optionsCreationLecture : optionsCreationLecture();
         Supplier<Dataset<Row>> worker = () -> {
            super.setStageDescription(this.messageSource, "row.etablissements.libelle.long", "row.etablissements.libelle.court", anneeSIRENE, anneeCOG, actifsSeulement, communesValides, nomenclaturesNAF2Valides);
            
            Map<String, Integer> indexs = new HashMap<>();
            Dataset<Row> etablissements = etablissementsNonFiltres(optionsCreationLecture, anneeSIRENE);
      
            etablissements = etablissements.filter(
               (FilterFunction<Row>)etablissement -> this.validator.validationEtablissement(this.session, historiqueExecution, etablissement, actifsSeulement, nomenclaturesNAF2Valides, indexs));
      
            // Si le filtrage par communes valides a été demandé, l'appliquer.
            if (communesValides) {
               etablissements = rowRestreindreAuxCommunesValides(etablissements, anneeCOG, anneeSIRENE, indexs);
            }
            else {
               etablissements = etablissements.withColumn("codeDepartement", substring(CODE_COMMUNE.col(), 1, 2));
            }
      
            // Associer les libellés des codes APE/NAF.
            Dataset<Row> nomenclatureNAF = this.nafDataset.rowNomenclatureNAF(anneeSIRENE);
            etablissements = etablissements.join(nomenclatureNAF, etablissements.col("activitePrincipale").equalTo(nomenclatureNAF.col("codeNAF")) , "left_outer")
               .drop("codeNAF", "niveauNAF");
      
            // Le dataset est maintenant considéré comme valide, et ses champs peuvent être castés dans leurs types définitifs.
            return this.validator.cast(etablissements);
         };
      
         return constitutionStandard(options, () -> worker.get()
            .withColumn("partitionSiren", SIREN_ENTREPRISE.col().substr(1,2)),
            new CacheParqueteur<>(options, this.session,
               "etablissements", "annee_{0,number,#0}-actifs_{1}-communes_verifiees_{2}-nafs_verifies_{3}", DEPARTEMENT_SIREN_SIRET,
               anneeSIRENE, anneeCOG, actifsSeulement, communesValides));
      } 

       

      In the worker, a filter calls a validationEtablissement(SparkSession, HistoriqueExecution, Row, ...) on each row to perform complete checking (eight rules to check for an establishment validity).

      When a check fails, along with a warning log, I'm also counting in historiqueExecution object the number of problems of that kind I've encountered.

      That function increase a longAccumulator value, and create that accumulator first, that it stores in a Map<String, LongAccumulator> accumulators,  if needed.

      {code:Java}
      public void incrementerOccurrences(SparkSession session, String codeOuFormatMessage, boolean creerSiAbsent) {
         LongAccumulator accumulator = accumulators.get(codeOuFormatMessage);
      
         if (accumulator == null && creerSiAbsent) {
            accumulator = session.sparkContext().longAccumulator(codeOuFormatMessage);
            accumulators.put(codeOuFormatMessage, accumulator);
         }
      
         if (accumulator != null) {
            accumulator.add(1);
         }
      }
      

       

      Or at least, it should. But my problem is that it isn't the case.

      During Dataset constitution :

      1) If I initialize the historiqueExecution variable with the exhaustive list of messages it can have to count, before the worker.get() is called by the constitutionStandard method, the dataset is perfectly constituted and I can dump my counters :

      {code:Java}
      historiqueExecution.setCodesMessages(this.session,
         "etablissement sans SIRET, peut être étranger : '{}'",
         "etablissement au SIRET '{}' invalide, écarté.",
         "etablissement sans SIREN d'entreprise, peut être étranger : '{}' (son SIRET vaut : {}), écarté.",
         "etablissement au SIREN d'entreprise '{}' invalide, écarté.",
         "établissement de SIRET {} écarté : il n'a pas de nomemclature APE.",
         "établissement de SIRET {} écarté : sa nomemclature {} n'est plus soutenue.",
         "établissement de SIRET {} écarté : il n'a pas de code APE.",
         "établissement de SIRET {} écarté : son code APE {} est invalide.",
         "établissement de SIRET {} écarté : son type de voie d'adresse principale, {}, est invalide",
         "établissement de SIRET {} écarté : son type de voie d'adresse secondaire, {}, est invalide",
         "établissement de SIRET {} écarté : sa date de création, {}, est invalide",
         "établissement de SIRET {} écarté : sa date de dernier traitement, {}, est invalide : {}",
         "établissement de SIRET {} écarté : sa date d'historisation, {}, est invalide");// code placeholder
      
      etablissement au SIRET '{}' invalide, écarté. : 0
      établissement de SIRET {} écarté : sa nomemclature {} n'est plus soutenue. : 0
      etablissement sans SIREN d'entreprise, peut être étranger : '{}' (son SIRET vaut : {}), écarté. : 0
      etablissement sans SIRET, peut être étranger : '{}' : 0
      établissement de SIRET {} écarté : son code APE {} est invalide. : 0
      établissement de SIRET {} écarté : il n'a pas de code APE. : 0
      établissement de SIRET {} écarté : sa date d'historisation, {}, est invalide : 0
      établissement de SIRET {} écarté : son type de voie d'adresse principale, {}, est invalide : 2
      établissement de SIRET {} écarté : sa date de dernier traitement, {}, est invalide : {} : 0
      etablissement au SIREN d'entreprise '{}' invalide, écarté. : 0
      établissement de SIRET {} écarté : il n'a pas de nomemclature APE. : 0
      établissement de SIRET {} écarté : son type de voie d'adresse secondaire, {}, est invalide : 0
      établissement de SIRET {} écarté : sa date de création, {}, est invalide : 0

      2) But if I don't initialize that list, and that I leave the creation of the missing longAccumulator to the worker itself, at runtime (filter time), then the :

      session.sparkContext().longAccumulator(codeOuFormatMessage);

      fails on a NullPointerException, as sparkContext() returns null.

       

      My first question is : is it normal that, when taking the real actions to build the dataset, the spark session returns a null context?

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            mlebihan Marc Le Bihan
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: