Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Not A Problem
-
2.3.0
-
None
Description
I am completely stuck with this issue, unable to progress further. For more info pls refer this post : https://stackoverflow.com/questions/49994085/spark-sql-2-3-dataframe-savemode-append-issue
I want to load multiple files one by one, don't want to load all files at a time. To achieve this i used SaveMode.Append, so that 2nd file data will be added to 1st file data in database, but it's throwing exception.
Exception in thread "main" org.apache.spark.sql.AnalysisException: unresolved operator 'InsertIntoTable LogicalRDD [a1#4, b1#5, c1#6, d1#7], false, false, false;; 'InsertIntoTable LogicalRDD [a1#4, b1#5, c1#6, d1#7], false, false, false +- LogicalRDD [a1#22, b1#23, c1#24, d1#25], false
Code:
package com.log; import com.log.common.RegexMatch; import com.log.spark.SparkProcessor; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.*; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.apache.spark.storage.StorageLevel; import java.util.ArrayList; import java.util.List; public class TestApp { private SparkSession spark; private SparkContext sparkContext; private SQLContext sqlContext; public TestApp() { SparkSession spark = SparkSession.builder().appName("Simple Application") .config("spark.master", "local").getOrCreate(); SparkContext sc = spark.sparkContext(); this.spark = spark; this.sparkContext = sc; } public static void main(String[] args) { TestApp app = new TestApp(); String[] afiles = {"C:\\Users\\test\\Desktop\\logs\\log1.txt", "C:\\Users\\test\\Desktop\\logs\\log2.txt"}; for (String file : afiles) { app.writeFileToSchema(file); } } public void writeFileToSchema(String filePath) { StructType schema = getSchema(); JavaRDD<Row> rowRDD = getRowRDD(filePath); if (spark.catalog().tableExists("mylogs")) { logDataFrame = spark.createDataFrame(rowRDD, schema); logDataFrame.createOrReplaceTempView("temptable"); logDataFrame.write().mode(SaveMode.Append).insertInto("mylogs");//exception } else { logDataFrame = spark.createDataFrame(rowRDD, schema); logDataFrame.createOrReplaceTempView("mylogs"); } Dataset<Row> results = spark.sql("SELECT count(b1) FROM mylogs"); List<Row> allrows = results.collectAsList(); System.out.println("Count:"+allrows); sqlContext = logDataFrame.sqlContext(); } Dataset<Row> logDataFrame; public List<Row> getTagList() { Dataset<Row> results = sqlContext.sql("SELECT distinct(b1) FROM mylogs"); List<Row> allrows = results.collectAsList(); return allrows; } public StructType getSchema() { String schemaString = "a1 b1 c1 d1"; List<StructField> fields = new ArrayList<>(); for (String fieldName : schemaString.split(" ")) { StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true); fields.add(field); } StructType schema = DataTypes.createStructType(fields); return schema; } public JavaRDD<Row> getRowRDD(String filePath) { JavaRDD<String> logRDD = sparkContext.textFile(filePath, 1).toJavaRDD(); RegexMatch reg = new RegexMatch(); JavaRDD<Row> rowRDD = logRDD .map((Function<String, Row>) line -> { String[] st = line.split(" "); return RowFactory.create(st[0], st[1], st[2], st[3]); }); rowRDD.persist(StorageLevel.MEMORY_ONLY()); return rowRDD; } }