diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java index 13848b6..9d58d13 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java +++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java @@ -29,6 +29,8 @@ import org.apache.hadoop.mapred.JobConf; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; import java.net.URL; import java.util.List; import java.util.Properties; @@ -66,13 +68,17 @@ public static Schema determineSchemaOrThrowException(Properties properties) throw new AvroSerdeException(EXCEPTION_MESSAGE); try { - if(schemaString.toLowerCase().startsWith("hdfs://")) - return getSchemaFromHDFS(schemaString, new Configuration()); - } catch(IOException ioe) { - throw new AvroSerdeException("Unable to read schema from HDFS: " + schemaString, ioe); + Schema s = getSchemaFromFS(schemaString, new Configuration()); + if (s == null) { + //in case schema is not a file system + return Schema.parse(new URL(schemaString).openStream()); + } + return s; + } catch (IOException ioe) { + throw new AvroSerdeException("Unable to read schema from given path: " + schemaString, ioe); + } catch (URISyntaxException urie) { + throw new AvroSerdeException("Unable to read schema from given path: " + schemaString, urie); } - - return Schema.parse(new URL(schemaString).openStream()); } /** @@ -96,13 +102,20 @@ public static Schema determineSchemaOrReturnErrorSchema(Properties props) { } } // Protected for testing and so we can pass in a conf for testing. - protected static Schema getSchemaFromHDFS(String schemaHDFSUrl, - Configuration conf) throws IOException { - FileSystem fs = FileSystem.get(conf); + protected static Schema getSchemaFromFS(String schemaFSUrl, + Configuration conf) throws IOException, URISyntaxException { FSDataInputStream in = null; - + FileSystem fs = null; + try { + fs = FileSystem.get(new URI(schemaFSUrl), conf); + } catch (IOException ioe) { + //return null only if the file system in schema is not recognized + String msg = "Failed to open file system for uri " + schemaFSUrl + " assuming it is not a FileSystem url"; + LOG.debug(msg, ioe); + return null; + } try { - in = fs.open(new Path(schemaHDFSUrl)); + in = fs.open(new Path(schemaFSUrl)); Schema s = Schema.parse(in); return s; } finally { diff --git serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerdeUtils.java serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerdeUtils.java index 010f614..67d5570 100644 --- serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerdeUtils.java +++ serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerdeUtils.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.net.MalformedURLException; +import java.net.URISyntaxException; import java.util.Properties; import static org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils.EXCEPTION_MESSAGE; @@ -137,8 +138,8 @@ public void detemineSchemaTriesToOpenUrl() throws AvroSerdeException, IOExceptio try { AvroSerdeUtils.determineSchemaOrThrowException(props); fail("Should have tried to open that URL"); - } catch(MalformedURLException e) { - assertEquals("unknown protocol: not", e.getMessage()); + } catch (AvroSerdeException e) { + assertEquals("Unable to read schema from given path: not:///a.real.url", e.getMessage()); } } @@ -173,13 +174,14 @@ public void noneOptionWorksForSpecifyingSchemas() throws IOException, AvroSerdeE try { determineSchemaOrThrowException(props); fail("Should have tried to open that bogus URL"); - } catch(MalformedURLException e) { - assertEquals("unknown protocol: not", e.getMessage()); + } catch (AvroSerdeException e) { + assertEquals("Unable to read schema from given path: not:///a.real.url", e.getMessage()); } } @Test - public void determineSchemaCanReadSchemaFromHDFS() throws IOException, AvroSerdeException { + public void determineSchemaCanReadSchemaFromHDFS() throws IOException, AvroSerdeException, + URISyntaxException{ String schemaString = TestAvroObjectInspectorGenerator.RECORD_SCHEMA; MiniDFSCluster miniDfs = null; try { @@ -194,7 +196,7 @@ public void determineSchemaCanReadSchemaFromHDFS() throws IOException, AvroSerde String onHDFS = miniDfs.getFileSystem().getUri() + "/path/to/schema/schema.avsc"; Schema schemaFromHDFS = - AvroSerdeUtils.getSchemaFromHDFS(onHDFS, miniDfs.getFileSystem().getConf()); + AvroSerdeUtils.getSchemaFromFS(onHDFS, miniDfs.getFileSystem().getConf()); Schema expectedSchema = Schema.parse(schemaString); assertEquals(expectedSchema, schemaFromHDFS); } finally {