diff --git hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatBaseStorer.java hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatBaseStorer.java index ae60030..3d114e3 100644 --- hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatBaseStorer.java +++ hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatBaseStorer.java @@ -195,6 +195,7 @@ public static boolean removeTupleFromBag(HCatFieldSchema hcatFieldSchema, FieldS private HCatFieldSchema getHCatFSFromPigFS(FieldSchema fSchema, HCatFieldSchema hcatFieldSchema, Schema pigSchema, HCatSchema tableSchema) throws FrontendException, HCatException { + String fieldName = getPigAliasWithoutNamespace(fSchema.alias); if(hcatFieldSchema == null) { if(LOG.isDebugEnabled()) { LOG.debug("hcatFieldSchema is null for fSchema '" + fSchema.alias + "'"); @@ -208,40 +209,40 @@ private HCatFieldSchema getHCatFSFromPigFS(FieldSchema fSchema, HCatFieldSchema case DataType.CHARARRAY: case DataType.BIGCHARARRAY: if(hcatFieldSchema != null && hcatFieldSchema.getTypeInfo() != null) { - return new HCatFieldSchema(fSchema.alias, hcatFieldSchema.getTypeInfo(), null); + return new HCatFieldSchema(fieldName, hcatFieldSchema.getTypeInfo(), null); } - return new HCatFieldSchema(fSchema.alias, TypeInfoFactory.stringTypeInfo, null); + return new HCatFieldSchema(fieldName, TypeInfoFactory.stringTypeInfo, null); case DataType.INTEGER: if (hcatFieldSchema != null) { if (!SUPPORTED_INTEGER_CONVERSIONS.contains(hcatFieldSchema.getType())) { throw new FrontendException("Unsupported type: " + type + " in Pig's schema", PigHCatUtil.PIG_EXCEPTION_CODE); } - return new HCatFieldSchema(fSchema.alias, hcatFieldSchema.getTypeInfo(), null); + return new HCatFieldSchema(fieldName, hcatFieldSchema.getTypeInfo(), null); } - return new HCatFieldSchema(fSchema.alias, TypeInfoFactory.intTypeInfo, null); + return new HCatFieldSchema(fieldName, TypeInfoFactory.intTypeInfo, null); case DataType.LONG: - return new HCatFieldSchema(fSchema.alias, TypeInfoFactory.longTypeInfo, null); + return new HCatFieldSchema(fieldName, TypeInfoFactory.longTypeInfo, null); case DataType.FLOAT: - return new HCatFieldSchema(fSchema.alias, TypeInfoFactory.floatTypeInfo, null); + return new HCatFieldSchema(fieldName, TypeInfoFactory.floatTypeInfo, null); case DataType.DOUBLE: - return new HCatFieldSchema(fSchema.alias, TypeInfoFactory.doubleTypeInfo, null); + return new HCatFieldSchema(fieldName, TypeInfoFactory.doubleTypeInfo, null); case DataType.BYTEARRAY: - return new HCatFieldSchema(fSchema.alias, TypeInfoFactory.binaryTypeInfo, null); + return new HCatFieldSchema(fieldName, TypeInfoFactory.binaryTypeInfo, null); case DataType.BOOLEAN: - return new HCatFieldSchema(fSchema.alias, TypeInfoFactory.booleanTypeInfo, null); + return new HCatFieldSchema(fieldName, TypeInfoFactory.booleanTypeInfo, null); case DataType.DATETIME: //Pig DATETIME can map to DATE or TIMESTAMP (see HCatBaseStorer#validateSchema()) which //is controlled by Hive target table information if(hcatFieldSchema != null && hcatFieldSchema.getTypeInfo() != null) { - return new HCatFieldSchema(fSchema.alias, hcatFieldSchema.getTypeInfo(), null); + return new HCatFieldSchema(fieldName, hcatFieldSchema.getTypeInfo(), null); } - return new HCatFieldSchema(fSchema.alias, TypeInfoFactory.timestampTypeInfo, null); + return new HCatFieldSchema(fieldName, TypeInfoFactory.timestampTypeInfo, null); case DataType.BIGDECIMAL: if(hcatFieldSchema != null && hcatFieldSchema.getTypeInfo() != null) { - return new HCatFieldSchema(fSchema.alias, hcatFieldSchema.getTypeInfo(), null); + return new HCatFieldSchema(fieldName, hcatFieldSchema.getTypeInfo(), null); } - return new HCatFieldSchema(fSchema.alias, TypeInfoFactory.decimalTypeInfo, null); + return new HCatFieldSchema(fieldName, TypeInfoFactory.decimalTypeInfo, null); case DataType.BAG: Schema bagSchema = fSchema.schema; List arrFields = new ArrayList(1); @@ -254,7 +255,7 @@ private HCatFieldSchema getHCatFSFromPigFS(FieldSchema fSchema, HCatFieldSchema } arrFields.add(getHCatFSFromPigFS(field, hcatFieldSchema == null ? null : hcatFieldSchema .getArrayElementSchema().get(0), pigSchema, tableSchema)); - return new HCatFieldSchema(fSchema.alias, Type.ARRAY, new HCatSchema(arrFields), ""); + return new HCatFieldSchema(fieldName, Type.ARRAY, new HCatSchema(arrFields), ""); case DataType.TUPLE: List hcatFSs = new ArrayList(); HCatSchema structSubSchema = hcatFieldSchema == null ? null : hcatFieldSchema.getStructSubSchema(); @@ -263,7 +264,7 @@ private HCatFieldSchema getHCatFSFromPigFS(FieldSchema fSchema, HCatFieldSchema FieldSchema fieldSchema = fields.get(i); hcatFSs.add(getHCatFSFromPigFS(fieldSchema, structSubSchema == null ? null : structSubSchema.get(i), pigSchema, tableSchema)); } - return new HCatFieldSchema(fSchema.alias, Type.STRUCT, new HCatSchema(hcatFSs), ""); + return new HCatFieldSchema(fieldName, Type.STRUCT, new HCatSchema(hcatFSs), ""); case DataType.MAP: { // Pig's schema contain no type information about map's keys and // values. So, if its a new column assume if its existing @@ -273,14 +274,14 @@ private HCatFieldSchema getHCatFSFromPigFS(FieldSchema fSchema, HCatFieldSchema List valFSList = new ArrayList(1); if (hcatFieldSchema != null) { - return HCatFieldSchema.createMapTypeFieldSchema(fSchema.alias, hcatFieldSchema.getMapKeyTypeInfo(), + return HCatFieldSchema.createMapTypeFieldSchema(fieldName, hcatFieldSchema.getMapKeyTypeInfo(), hcatFieldSchema.getMapValueSchema(), ""); } // Column not found in target table. Its a new column. Its schema is map - valFS = new HCatFieldSchema(fSchema.alias, TypeInfoFactory.stringTypeInfo, ""); + valFS = new HCatFieldSchema(fieldName, TypeInfoFactory.stringTypeInfo, ""); valFSList.add(valFS); - return HCatFieldSchema.createMapTypeFieldSchema(fSchema.alias, + return HCatFieldSchema.createMapTypeFieldSchema(fieldName, TypeInfoFactory.stringTypeInfo, new HCatSchema(valFSList), ""); } case DataType.BIGINTEGER: @@ -608,11 +609,12 @@ private static void throwTypeMismatchException(byte pigDataType, } private void validateAlias(String alias) throws FrontendException { - if (alias == null) { + String fieldName = getPigAliasWithoutNamespace(alias); + if (fieldName == null) { throw new FrontendException("Column name for a field is not specified. Please provide the full schema as an argument to HCatStorer.", PigHCatUtil.PIG_EXCEPTION_CODE); } - if (alias.matches(".*[A-Z]+.*")) { - throw new FrontendException("Column names should all be in lowercase. Invalid name found: " + alias, PigHCatUtil.PIG_EXCEPTION_CODE); + if (fieldName.matches(".*[A-Z]+.*")) { + throw new FrontendException("Column names should all be in lowercase. Invalid name found: " + fieldName, PigHCatUtil.PIG_EXCEPTION_CODE); } } @@ -620,7 +622,8 @@ private void validateAlias(String alias) throws FrontendException { private HCatFieldSchema getColFromSchema(String alias, HCatSchema tblSchema) { if (tblSchema != null) { for (HCatFieldSchema hcatField : tblSchema.getFields()) { - if (hcatField != null && hcatField.getName() != null && hcatField.getName().equalsIgnoreCase(alias)) { + String fieldName = getPigAliasWithoutNamespace(alias); + if (hcatField != null && hcatField.getName() != null && hcatField.getName().equalsIgnoreCase(fieldName)) { return hcatField; } } @@ -629,6 +632,14 @@ private HCatFieldSchema getColFromSchema(String alias, HCatSchema tblSchema) { return null; } + private String getPigAliasWithoutNamespace(String alias) { + if (alias != null && alias.contains("::")) { + return StringUtils.substringAfterLast(alias, "::"); + } + // Doesn't have a namespace prepended + return alias; + } + @Override public void cleanupOnFailure(String location, Job job) throws IOException { // No-op. diff --git hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorer.java hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorer.java index fcfc642..9c7a98d 100644 --- hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorer.java +++ hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorer.java @@ -733,6 +733,115 @@ public void testBagNStruct() throws IOException, CommandNeedRetryException { } @Test + public void testAliasWithoutNamespace() throws IOException, CommandNeedRetryException { + + driver.run("drop table junit_unparted"); + String createTable = "create table junit_unparted(a int, b string) stored as " + getStorageFormat(); + int retCode = driver.run(createTable).getResponseCode(); + if (retCode != 0) { + throw new IOException("Failed to create table."); + } + int LOOP_SIZE = 11; + String[] input = new String[LOOP_SIZE]; + for (int i = 0; i < LOOP_SIZE; i++) { + input[i] = i + "\t1"; + } + HcatTestUtils.createTestDataFile(INPUT_FILE_NAME, input); + PigServer server = new PigServer(ExecType.LOCAL); + server.registerQuery("A = load '" + INPUT_FILE_NAME + "' as (a:int, b:chararray);"); + server.registerQuery("G = group A by a;"); + server.registerQuery("F = foreach G generate flatten($1);"); + server.registerQuery("store F into 'default.junit_unparted' using " + HCatStorer.class.getName() + "();"); + server.registerQuery("B = load 'default.junit_unparted' using " + HCatLoader.class.getName() + "();"); + Iterator itr = server.openIterator("B"); + + int i = 0; + + while (itr.hasNext()) { + Tuple t = itr.next(); + Assert.assertEquals(2, t.size()); + Assert.assertEquals(t.get(0), i); + Assert.assertEquals(t.get(1), "1"); + i++; + } + + Assert.assertFalse(itr.hasNext()); + Assert.assertEquals(LOOP_SIZE, i); + } + + @Test + public void testNamespaceInAliasPartCols() throws IOException, CommandNeedRetryException { + + driver.run("drop table junit_unparted"); + String createTable = "create table junit_unparted(a int) partitioned by (b string) stored as " + getStorageFormat(); + int retCode = driver.run(createTable).getResponseCode(); + if (retCode != 0) { + throw new IOException("Failed to create table."); + } + int LOOP_SIZE = 11; + String[] input = new String[LOOP_SIZE]; + for (int i = 0; i < LOOP_SIZE; i++) { + input[i] = i + "\t1"; + } + HcatTestUtils.createTestDataFile(INPUT_FILE_NAME, input); + PigServer server = new PigServer(ExecType.LOCAL); + server.registerQuery("A = load '" + INPUT_FILE_NAME + "' as (a:int, b:chararray);"); + server.registerQuery("G = group A by a;"); + server.registerQuery("F = foreach G generate flatten($1);"); + server.registerQuery("store F into 'default.junit_unparted' using " + HCatStorer.class.getName() + "('b=1');"); + server.registerQuery("B = load 'default.junit_unparted' using " + HCatLoader.class.getName() + "();"); + Iterator itr = server.openIterator("B"); + + int i = 0; + + while (itr.hasNext()) { + Tuple t = itr.next(); + Assert.assertEquals(2, t.size()); + Assert.assertEquals(t.get(0), i); + Assert.assertEquals(t.get(1), "1"); + i++; + } + + Assert.assertFalse(itr.hasNext()); + Assert.assertEquals(LOOP_SIZE, i); + } + + @Test + public void testAliasWithoutNamespaceBagNStruct() throws IOException, CommandNeedRetryException { + driver.run("drop table junit_unparted"); + String createTable = "create table junit_unparted(b string,a struct, arr_of_struct array, " + + "arr_of_struct2 array>, arr_of_struct3 array>) stored as " + getStorageFormat(); + int retCode = driver.run(createTable).getResponseCode(); + if (retCode != 0) { + throw new IOException("Failed to create table."); + } + + String[] inputData = new String[]{"zookeeper\t(2)\t{(pig)}\t{(pnuts,hdfs)}\t{(hadoop),(hcat)}", + "chubby\t(2)\t{(sawzall)}\t{(bigtable,gfs)}\t{(mapreduce),(hcat)}"}; + + HcatTestUtils.createTestDataFile(INPUT_FILE_NAME, inputData); + + PigServer server = new PigServer(ExecType.LOCAL); + server.setBatchOn(); + server.registerQuery("A = load '" + INPUT_FILE_NAME + "' as (b:chararray, a:tuple(a1:int), arr_of_struct:bag{mytup:tuple(s1:chararray)}, arr_of_struct2:bag{mytup:tuple(s1:chararray,s2:chararray)}, arr_of_struct3:bag{t3:tuple(s3:chararray)});"); + server.registerQuery("G = group A by a;"); + server.registerQuery("F = foreach G generate flatten($1);"); + server.registerQuery("store F into 'default.junit_unparted' using " + HCatStorer.class.getName() + "();"); + server.executeBatch(); + + driver.run("select * from junit_unparted"); + ArrayList res = new ArrayList(); + driver.getResults(res); + driver.run("drop table junit_unparted"); + Iterator itr = res.iterator(); + Assert.assertEquals("chubby\t{\"a1\":2}\t[\"sawzall\"]\t[{\"s1\":\"bigtable\",\"s2\":\"gfs\"}]\t[{\"s3\":\"mapreduce\"},{\"s3\":\"hcat\"}]", itr.next()); + Assert.assertEquals("zookeeper\t{\"a1\":2}\t[\"pig\"]\t[{\"s1\":\"pnuts\",\"s2\":\"hdfs\"}]\t[{\"s3\":\"hadoop\"},{\"s3\":\"hcat\"}]", itr.next()); + Assert.assertFalse(itr.hasNext()); + + } + + + @Test public void testStoreFuncAllSimpleTypes() throws IOException, CommandNeedRetryException { driver.run("drop table junit_unparted");