Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-24089

DataFrame.write().mode(SaveMode.Append).insertInto(TABLE)

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Not A Problem
    • 2.3.0
    • None
    • Java API, Spark Core, SQL

    Description

      I am completely stuck with this issue, unable to progress further. For more info pls refer this post : https://stackoverflow.com/questions/49994085/spark-sql-2-3-dataframe-savemode-append-issue

      I want to load multiple files one by one, don't want to load all files at a time. To achieve this i used SaveMode.Append, so that 2nd file data will be added to 1st file data in database, but it's throwing exception.

      Exception in thread "main" org.apache.spark.sql.AnalysisException: unresolved operator 'InsertIntoTable LogicalRDD [a1#4, b1#5, c1#6, d1#7], false, false, false;;
      'InsertIntoTable LogicalRDD [a1#4, b1#5, c1#6, d1#7], false, false, false
      +- LogicalRDD [a1#22, b1#23, c1#24, d1#25], false
      

      Code:

      package com.log;
      
      import com.log.common.RegexMatch;
      import com.log.spark.SparkProcessor;
      import org.apache.spark.SparkContext;
      import org.apache.spark.api.java.JavaRDD;
      import org.apache.spark.api.java.function.Function;
      import org.apache.spark.sql.*;
      import org.apache.spark.sql.types.DataTypes;
      import org.apache.spark.sql.types.StructField;
      import org.apache.spark.sql.types.StructType;
      import org.apache.spark.storage.StorageLevel;
      
      import java.util.ArrayList;
      import java.util.List;
      
      public class TestApp {
      
          private SparkSession spark;
          private SparkContext sparkContext;
          private SQLContext sqlContext;
      
          public TestApp() {
              SparkSession spark = SparkSession.builder().appName("Simple Application")
                      .config("spark.master", "local").getOrCreate();
      
              SparkContext sc = spark.sparkContext();
      
              this.spark = spark;
              this.sparkContext = sc;
          }
      
          public static void main(String[] args) {
              TestApp app = new TestApp();
      
              String[] afiles = {"C:\\Users\\test\\Desktop\\logs\\log1.txt",
                      "C:\\Users\\test\\Desktop\\logs\\log2.txt"};
      
              for (String file : afiles) {
                  app.writeFileToSchema(file);
              }
          }
      
          public void writeFileToSchema(String filePath) {
      
              StructType schema = getSchema();
              JavaRDD<Row> rowRDD = getRowRDD(filePath);
      
              if (spark.catalog().tableExists("mylogs")) {
      
                  logDataFrame = spark.createDataFrame(rowRDD, schema);
                  logDataFrame.createOrReplaceTempView("temptable");
                  logDataFrame.write().mode(SaveMode.Append).insertInto("mylogs");//exception
              } else {
                  logDataFrame = spark.createDataFrame(rowRDD, schema);
                  logDataFrame.createOrReplaceTempView("mylogs");
              }
      
              Dataset<Row> results = spark.sql("SELECT count(b1) FROM mylogs");
      
              List<Row> allrows = results.collectAsList();
      
              System.out.println("Count:"+allrows);
      
              sqlContext = logDataFrame.sqlContext();
          }
      
          Dataset<Row> logDataFrame;
      
          public List<Row> getTagList() {
      
              Dataset<Row> results = sqlContext.sql("SELECT distinct(b1) FROM mylogs");
              List<Row> allrows = results.collectAsList();
      
              return allrows;
          }
      
          public StructType getSchema() {
              String schemaString = "a1 b1 c1 d1";
      
              List<StructField> fields = new ArrayList<>();
              for (String fieldName : schemaString.split(" ")) {
                  StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
                  fields.add(field);
              }
      
              StructType schema = DataTypes.createStructType(fields);
      
              return schema;
          }
      
      
          public JavaRDD<Row> getRowRDD(String filePath) {
      
              JavaRDD<String> logRDD = sparkContext.textFile(filePath, 1).toJavaRDD();
      
              RegexMatch reg = new RegexMatch();
              JavaRDD<Row> rowRDD = logRDD
      
                      .map((Function<String, Row>) line -> {
      
                          String[] st = line.split(" ");
      
                          return RowFactory.create(st[0], st[1], st[2], st[3]);
                      });
      
              rowRDD.persist(StorageLevel.MEMORY_ONLY());
      
              return rowRDD;
          }
      }
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            rkrgarlapati kumar
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: