Index: src/java/org/apache/hcatalog/tools/growl/Crawler.java =================================================================== --- src/java/org/apache/hcatalog/tools/growl/Crawler.java (revision 0) +++ src/java/org/apache/hcatalog/tools/growl/Crawler.java (revision 0) @@ -0,0 +1,9 @@ +package org.apache.hcatalog.tools.growl; + +public interface Crawler { + + public void crawl() throws Exception; + + public void registerMetadataCallback(MetadataCallback mdc); + +} Index: src/java/org/apache/hcatalog/tools/growl/taskbased/TaskBasedCrawler.java =================================================================== --- src/java/org/apache/hcatalog/tools/growl/taskbased/TaskBasedCrawler.java (revision 0) +++ src/java/org/apache/hcatalog/tools/growl/taskbased/TaskBasedCrawler.java (revision 0) @@ -0,0 +1,84 @@ +package org.apache.hcatalog.tools.growl.taskbased; + +import java.util.Collection; + +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hcatalog.tools.growl.Crawler; +import org.apache.hcatalog.tools.growl.GrowlUtil; +import org.apache.hcatalog.tools.growl.MetadataCallback; +import org.apache.hcatalog.tools.growl.PartitionProxy; +import org.apache.hcatalog.tools.growl.TableProxy; + +public class TaskBasedCrawler implements Crawler { + + public static final String GROWL_CRAWL_TASK_IMPL = "CrawlTaskImpl"; + private MetadataCallback mdc = null; + private CrawlTask task = null; + + public TaskBasedCrawler() throws Exception { + task = (CrawlTask) GrowlUtil.instantiateFromProperties(GROWL_CRAWL_TASK_IMPL); + } + + private void registerPreDepthCrawl(CrawlNode item) throws Exception{ + if (item.isRoot()){ + // root - create table + TableProxy tableProxy = task.getTableProxy(item); + if (tableProxy != null){ + Table tableInMetadata = mdc.createTable(tableProxy,task.getDuplicateTableStrategy()); + task.supplyTableInfo(item, new TableProxy(tableInMetadata)); + } + } else if (item.isLeaf()){ + // leaf and non-root mapping - create partitions + // (cases where leaf and root are the same are cases of unpartitioned tables, no partition publish for them) + PartitionProxy pproxy = task.getPartitionProxy(item); + if (pproxy != null){ + Partition partitionInMetadata = mdc.createPartition(pproxy, task.getDuplicatePartitionStrategy()); + task.supplyPartitionInfo(item, new PartitionProxy(partitionInMetadata)); + } + } + } + + private void registerPostDepthCrawl(CrawlNode item) throws Exception { + // noop + } + + private void crawl(CrawlNode item) throws Exception { + if (task.isCrawlable(item)){ + System.out.println("Visiting "+item); + registerPreDepthCrawl(item); + if (!item.isLeaf()){ + for (CrawlNode child : item.getChildren()){ + crawl(child); + } + } + registerPostDepthCrawl(item); + }else{ + System.out.println("Skipping "+item); + } + } + + private void testMdc() throws Exception { + if (mdc == null){ + throw new Exception("Metadata callback not registered yet! Call registerMetadataCallback() first"); + } + } + + @Override + public void crawl() throws Exception { + testMdc(); + Collection roots = task.getRootItems(); + for (CrawlNode root : roots){ + crawl(root); + } + + } + + @Override + public void registerMetadataCallback(MetadataCallback mdc) { + this.mdc = mdc; + } + + + +} Index: src/java/org/apache/hcatalog/tools/growl/taskbased/FSCrawlTask.java =================================================================== --- src/java/org/apache/hcatalog/tools/growl/taskbased/FSCrawlTask.java (revision 0) +++ src/java/org/apache/hcatalog/tools/growl/taskbased/FSCrawlTask.java (revision 0) @@ -0,0 +1,149 @@ +package org.apache.hcatalog.tools.growl.taskbased; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hcatalog.tools.growl.Growl; +import org.apache.hcatalog.tools.growl.MetadataCallback; +import org.apache.hcatalog.tools.growl.PartitionProxy; +import org.apache.hcatalog.tools.growl.TableProxy; + +public class FSCrawlTask extends CrawlTask { + + protected List roots = null; + + private final Map tpMap; + + public FSCrawlTask() throws Exception { + super(); + Configuration conf = new Configuration(); + roots = new ArrayList(); + + for (String feedName : taskConf.getFeeds()){ + roots.add(new FileSystemNode(conf,taskConf.getRoot(feedName),taskConf.getDepth(feedName))); + } + + tpMap = new HashMap(); + + } + + @Override + public Collection getRootItems() { + return roots; + } + + @Override + public TableProxy getTableProxy(CrawlNode item) throws Exception { + String root = ((FileSystemNode)item.getTreeRoot()).toString(); + if ( tpMap.containsKey(root) ){ + return tpMap.get(root); + }else{ + String feedName = taskConf.getFeedNameByRoot(root); + + TableProxy tp = (new TableProxy(feedName)) + .inDb(taskConf.getDbToUse()) + .withLocation(root) + .withTableSchema(taskConf.getTableSchema(feedName)) + .withPartitioningSchema(taskConf.getPartitioningSchema(feedName)) + .withInputFormat(taskConf.getTableIf(feedName)) + .withOutputFormat(taskConf.getTableOf(feedName)) + .withSerde(taskConf.getTableSerde(feedName)) + ; + for (Entry param : taskConf.getTableParameters(feedName).entrySet()){ + tp.withParameter(param.getKey(), param.getValue()); + } + + tpMap.put(root, tp); + return tp; + } + } + + + @Override + public void supplyTableInfo(CrawlNode item, TableProxy tableInMetadata) + throws Exception { + if (tableInMetadata != null){ + String root = ((FileSystemNode)item.getTreeRoot()).toString(); + tpMap.put(root, tableInMetadata); + }else{ + throw new Exception("Feedback Table was null! Table provided not registered with metadata."); + } + } + + @Override + public PartitionProxy getPartitionProxy(CrawlNode item) throws Exception { + if (item.isLeaf()){ + List pvals = new ArrayList(); + CrawlNode tc = item; + CrawlNode treeRoot = item.getTreeRoot(); + String locnSuffix = new String(); + while (tc != treeRoot){ + String name = ((FileSystemNode)tc).getPath().getName(); + locnSuffix = name + "/" + locnSuffix; + pvals.add(name); + tc = tc.getParent(); + } + Collections.reverse(pvals); + + Table tbl = getTableProxy(treeRoot).getTable(); + String feedName = tbl.getTableName(); + PartitionProxy pp = (new PartitionProxy()) + .inTable(tbl) + .withValues(pvals) + .withLocationSuffix(locnSuffix) + .withPartitionSchema(taskConf.getPartitionSchemaMapper(feedName).getSchema(locnSuffix)) + .withInputFormat(taskConf.getPartitionInputFormat(feedName)) + .withSerde(taskConf.getPartitionSerde(feedName)) + ; + for (Entry param : taskConf.getPartitionParameters(feedName).entrySet()){ + pp.withParameter(param.getKey(), param.getValue()); + } + return pp; + }else{ + return null; + } + } + + @Override + public void supplyPartitionInfo(CrawlNode item, PartitionProxy ptnInMetadata) + throws Exception { + if (ptnInMetadata == null){ + throw new Exception("Feedback partition was null! Partition provided not registered with metadata."); + } + } + + @Override + public boolean isCrawlable(CrawlNode item) throws Exception{ + for (String excludesRegex : taskConf.getExcludesRegexes()){ + if (((FileSystemNode)item).toString().matches(excludesRegex)){ +// System.out.println("Whoops, item["+((FileSystemNode)item).toString()+"] matched excludes regex ["+excludesRegex+"]"); + return false; + } + } + if (taskConf.getIncludesRegexes().size() == 0){ + return true; // no include regexes specified, so we pass. + } + for (String includesRegex : taskConf.getIncludesRegexes()){ + if (((FileSystemNode)item).toString().matches(includesRegex)){ +// System.out.println("Phew, item["+((FileSystemNode)item).toString()+"] matched includes regex ["+includesRegex+"]"); + return true; // return true if even one of the includes regexes match. + } + } + return false; // if we got here, then includes-regexes were specified, but we matched none of them - return false. + } + + public static void main(String args[]) throws Exception{ + System.getProperties().put(Growl.GROWL_CRAWLER_IMPL, TaskBasedCrawler.class.getName()); + System.getProperties().put(TaskBasedCrawler.GROWL_CRAWL_TASK_IMPL,FSCrawlTask.class.getName()); + + Growl.main(null); + MetadataCallback.main(null); + } +} Index: src/java/org/apache/hcatalog/tools/growl/taskbased/HCatSchemaMapper.java =================================================================== --- src/java/org/apache/hcatalog/tools/growl/taskbased/HCatSchemaMapper.java (revision 0) +++ src/java/org/apache/hcatalog/tools/growl/taskbased/HCatSchemaMapper.java (revision 0) @@ -0,0 +1,15 @@ +package org.apache.hcatalog.tools.growl.taskbased; + +import org.apache.hcatalog.data.schema.HCatSchema; + +public interface HCatSchemaMapper { + + public void init(String input) throws Exception; + + // a no-argument form - called for partitioning schema and table schema + public HCatSchema getSchema() throws Exception; + + // with a partition location suffix as an argument + public HCatSchema getSchema(String locnSuffix) throws Exception; + +} Index: src/java/org/apache/hcatalog/tools/growl/taskbased/SimpleCrawlNode.java =================================================================== --- src/java/org/apache/hcatalog/tools/growl/taskbased/SimpleCrawlNode.java (revision 0) +++ src/java/org/apache/hcatalog/tools/growl/taskbased/SimpleCrawlNode.java (revision 0) @@ -0,0 +1,67 @@ +package org.apache.hcatalog.tools.growl.taskbased; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + + +public class SimpleCrawlNode extends CrawlNode { + + String name = null; + SimpleCrawlNode parent = null; + SimpleCrawlNode root = null; + List children = null; + + public SimpleCrawlNode(String name){ + this.name = name; + this.root = this; + } + + public SimpleCrawlNode(String name, SimpleCrawlNode parent){ + this(name); + this.parent = parent; + this.root = (SimpleCrawlNode) parent.getTreeRoot(); + this.parent.registerChild(this); + } + + private void registerChild(SimpleCrawlNode entry) { + checkOnChildren(); + children.add(entry); + } + + @SuppressWarnings("unused") + private SimpleCrawlNode(){ + // private default ctor to prevent usage + } + + @Override + public List getChildren() throws IOException { + checkOnChildren(); + return children; + } + + private void checkOnChildren() { + if (children == null){ + children = new ArrayList(); + } + } + + @Override + public CrawlNode getParent() { + return parent; + } + + @Override + public CrawlNode getTreeRoot() { + return root; + } + + public String getName() { + return name; + } + + public String toString() { + return root.getName() + "->" + name; + } + +} Index: src/java/org/apache/hcatalog/tools/growl/taskbased/TaskConf.java =================================================================== --- src/java/org/apache/hcatalog/tools/growl/taskbased/TaskConf.java (revision 0) +++ src/java/org/apache/hcatalog/tools/growl/taskbased/TaskConf.java (revision 0) @@ -0,0 +1,282 @@ +package org.apache.hcatalog.tools.growl.taskbased; + +import java.io.File; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; + +import org.apache.hcatalog.data.schema.HCatSchema; +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; + +public class TaskConf { + + Map> feedMap; + Map> tablePropertiesMap; + Map> ptnPropertiesMap; + Map schemas; + Map schemaMappers; + Map hschemaMapperCache; + Map feedNamesByRoot; + List excludes; + List includes; + + String dbToUse = null; + + public TaskConf(String confFileName) throws Exception { + System.out.println("opening task config ["+confFileName+"]"); + DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance(); + DocumentBuilder docBuilder = docBuilderFactory.newDocumentBuilder(); + Document doc = docBuilder.parse (new File(confFileName)); + doc.getDocumentElement().normalize(); + schemas = new HashMap(); + schemaMappers = new HashMap(); + + NodeList listOfSchema = doc.getElementsByTagName("hcatschema"); + for (int i=0; i< listOfSchema.getLength() ; i++){ + Node snode = listOfSchema.item(i); + if (snode.getNodeType() == Node.ELEMENT_NODE){ + Element schemaElement = (Element) snode; + + String sname = getNodeFieldValue(schemaElement,"name"); + String svalue = getNodeFieldValue(schemaElement,"value"); + String smapper = getNodeFieldValue(schemaElement,"mapper"); + + schemas.put(sname,svalue); + schemaMappers.put(sname,smapper); + + // System.out.println("Schema Name : " + sname + " value : " + svalue + " mapper : " + smapper); + + } + } + + feedMap = new HashMap>(); + tablePropertiesMap = new HashMap>(); + ptnPropertiesMap = new HashMap>(); + + NodeList listOfFeeds = doc.getElementsByTagName("feed"); + for (int i=0; i< listOfFeeds.getLength() ; i++){ + Node fnode = listOfFeeds.item(i); + if (fnode.getNodeType() == Node.ELEMENT_NODE){ + Element feedElement = (Element) fnode; + + String fname = getNodeFieldValue(feedElement,"name"); + String froot = getNodeFieldValue(feedElement,"root"); + String ftableSchema = getNodeFieldValue(feedElement,"tableschema"); + String fptningSchema = getNodeFieldValue(feedElement,"partitioningschema"); + String ftableIsd = getNodeFieldValue(feedElement,"table.hcat.isd"); + String ftableOsd = getNodeFieldValue(feedElement,"table.hcat.osd"); + String fptnIsd = getNodeFieldValue(feedElement,"ptn.hcat.isd"); + + String ftableIf = getNodeFieldValue(feedElement,"table.if"); + String ftableOf = getNodeFieldValue(feedElement,"table.of"); + String ftableSerde = getNodeFieldValue(feedElement,"table.serde"); + String fptnIf = getNodeFieldValue(feedElement,"ptn.if"); + String fptnSchema = getNodeFieldValue(feedElement,"ptn.schema"); + String fptnSerde = getNodeFieldValue(feedElement,"ptn.serde"); + + String ftableParams = getNodeFieldValue(feedElement,"table.hcat.param"); + String fptnParams = getNodeFieldValue(feedElement,"ptn.hcat.param"); + + feedMap.put(fname, new HashMap()); + feedMap.get(fname).put("root", froot); + feedMap.get(fname).put("table.schema", ftableSchema); + feedMap.get(fname).put("partitioning.schema", fptningSchema); + + feedMap.get(fname).put("table.if", ftableIf); + feedMap.get(fname).put("table.of", ftableOf); + feedMap.get(fname).put("table.serde", ftableSerde); + feedMap.get(fname).put("ptn.if", fptnIf); + feedMap.get(fname).put("ptn.serde", fptnSerde); + feedMap.get(fname).put("ptn.schema", fptnSchema); + + tablePropertiesMap.put(fname, new HashMap()); + tablePropertiesMap.get(fname).put("hcat.isd", ftableIsd); + tablePropertiesMap.get(fname).put("hcat.osd", ftableOsd); + if (ftableParams != null){ + for (String kvpair : ftableParams.split(",")){ + String[] kvp = kvpair.split("=",2); + if (kvp.length == 2){ + tablePropertiesMap.get(fname).put("hcat."+kvp[0], kvp[1]); + } + } + } + + ptnPropertiesMap.put(fname, new HashMap()); + ptnPropertiesMap.get(fname).put("hcat.isd", fptnIsd); + if (fptnParams != null){ + for (String kvpair : fptnParams.split(",")){ + String[] kvp = kvpair.split("=",2); + if (kvp.length == 2){ + ptnPropertiesMap.get(fname).put("hcat."+kvp[0], kvp[1]); + } + } + } + } + } + + excludes = new ArrayList(); + NodeList listOfExcludes = doc.getElementsByTagName("excludes"); + for (int i=0; i< listOfExcludes.getLength() ; i++){ + Node enode = listOfExcludes.item(i); + if (enode.getNodeType() == Node.ELEMENT_NODE){ + NodeList textValueList = ((Element)enode).getChildNodes(); + if (textValueList.getLength() > 0){ + excludes.add(((Node)textValueList.item(0)).getNodeValue().trim()); + } + } + } + includes = new ArrayList(); + NodeList listOfIncludes = doc.getElementsByTagName("includes"); + for (int i=0; i< listOfIncludes.getLength() ; i++){ + Node enode = listOfIncludes.item(i); + if (enode.getNodeType() == Node.ELEMENT_NODE){ + NodeList textValueList = ((Element)enode).getChildNodes(); + if (textValueList.getLength() > 0){ + includes.add(((Node)textValueList.item(0)).getNodeValue().trim()); + } + } + } + + hschemaMapperCache = new HashMap(); + + feedNamesByRoot = new HashMap(); + for (String feedName : getFeeds()){ + feedNamesByRoot.put(feedMap.get(feedName).get("root"), feedName); + } + + NodeList listOfDb = doc.getElementsByTagName("db"); + if (listOfDb.getLength() > 0 ){ // we pick only the first spec + Node dnode = listOfDb.item(0); + if (dnode.getNodeType() == Node.ELEMENT_NODE){ + NodeList textValueList = ((Element)dnode).getChildNodes(); + if (textValueList.getLength() > 0){ + dbToUse = ((Node)textValueList.item(0)).getNodeValue().trim(); + } + } + } + + } + + private String getNodeFieldValue(Element node, String fieldName) { + NodeList innerNodeList = node.getElementsByTagName(fieldName); + Element innerElement = (Element)innerNodeList.item(0); + if (innerElement == null){ + return null; + } + NodeList textValueList = innerElement.getChildNodes(); + if (textValueList.getLength() > 0){ + return ((Node)textValueList.item(0)).getNodeValue().trim(); + }else{ + return null; + } + } + + public Collection getFeeds() { + return feedMap.keySet(); + } + + public String getRoot(String feedName) { + return feedMap.get(feedName).get("root"); // FIXME : contains-guard-exception + } + + public int getDepth(String feedName) throws Exception { + // System.out.println("Getting depth for "+feedName); + HCatSchema hschema = getPartitioningSchema(feedName); + // System.out.println("partitioning schema is "+ hschema.toString()+". numFields is "+ hschema.getFields().size()); + return hschema.getFields().size(); + } + + public String getTableIf(String feedName) throws Exception { + return feedMap.get(feedName).get("table.if"); + } + + public String getTableOf(String feedName) throws Exception { + return feedMap.get(feedName).get("table.of"); + } + + public String getTableSerde(String feedName) throws Exception { + return feedMap.get(feedName).get("table.serde"); + } + + public String getPartitionInputFormat(String feedName) throws Exception { + return feedMap.get(feedName).get("ptn.if"); + } + + public String getPartitionSerde(String feedName) throws Exception { + return feedMap.get(feedName).get("ptn.serde"); + } + + public HCatSchema getPartitioningSchema(String feedName) + throws Exception { + String schemaName = feedMap.get(feedName).get("partitioning.schema"); // FIXME : contains-guard-exception + return getSchemaMapper(schemaName).getSchema(); + } + + public HCatSchema getTableSchema(String feedName) + throws Exception { + String schemaName = feedMap.get(feedName).get("table.schema"); // FIXME : contains-guard-exception + return getSchemaMapper(schemaName).getSchema(); + } + + public HCatSchemaMapper getPartitionSchemaMapper(String feedName) throws Exception { + String schemaName = feedMap.get(feedName).get("ptn.schema"); // FIXME : contains-guard-exception + return getSchemaMapper(schemaName); + } + + private void debugPrint(PrintStream ps, Object m) { + ps.println(m); + } + + public HCatSchemaMapper getSchemaMapper(String schemaName) throws Exception { + // System.out.println("Getting schema for "+ schemaName); + if (!hschemaMapperCache.containsKey(schemaName)){ + HCatSchemaMapper mapper; + if (schemaMappers.containsKey(schemaName)){ + mapper = (HCatSchemaMapper) Class.forName(schemaMappers.get(schemaName)).newInstance(); + }else{ + mapper = new StringHCatSchemaMapper(); + } + mapper.init(schemas.get(schemaName)); + hschemaMapperCache.put(schemaName, mapper); + } + return hschemaMapperCache.get(schemaName); + } + + public String getFeedNameByRoot(String root) throws Exception { + if (!feedNamesByRoot.containsKey(root)){ + System.err.println("did not find feed with root ["+root+"]"); + debugPrint(System.err,feedNamesByRoot); + } + return feedNamesByRoot.get(root); + } + + public List getExcludesRegexes() { + return excludes; + } + + public List getIncludesRegexes() { + return includes; + } + + public Map getTableParameters(String feedName) { + return tablePropertiesMap.get(feedName); // FIXME : contains-guard-exception + } + + public Map getPartitionParameters(String feedName) { + return ptnPropertiesMap.get(feedName); // FIXME : contains-guard-exception + } + + public String getDbToUse(){ + return dbToUse; + } +} Index: src/java/org/apache/hcatalog/tools/growl/taskbased/FileSystemNode.java =================================================================== --- src/java/org/apache/hcatalog/tools/growl/taskbased/FileSystemNode.java (revision 0) +++ src/java/org/apache/hcatalog/tools/growl/taskbased/FileSystemNode.java (revision 0) @@ -0,0 +1,114 @@ +package org.apache.hcatalog.tools.growl.taskbased; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +public class FileSystemNode extends CrawlNode { + + Path path = null; + FileSystem fs = null; + FileStatus fstatus = null; + CrawlNode parent = null; + CrawlNode root = null; + List children = null; + int depthToTraverse = -1; + + public FileSystemNode(FileSystem fs, Path path) { + this.path = path; + this.fs = fs; + this.root = this; + this.depthToTraverse = -1; + } + + public FileSystemNode(FileSystem fs, Path path, int depthToTraverse) { + this(fs,path); + this.depthToTraverse = depthToTraverse; + } + + public FileSystemNode(Configuration conf, String uri) throws IOException { + this(FileSystem.get(URI.create(uri), conf),new Path(uri)); + } + + public FileSystemNode(Configuration conf, String uri,int depthToTraverse) throws IOException { + this(FileSystem.get(URI.create(uri), conf),new Path(uri),depthToTraverse); + } + + public FileSystemNode(FileSystem fs, Path path, FileSystemNode parent){ + this(fs,path); + this.parent = parent; + this.root = parent.getTreeRoot(); + int parentDepthRemaining = parent.getDepthRemaining(); + if (parentDepthRemaining == -1){ + this.depthToTraverse = -1; + }else{ + this.depthToTraverse = parentDepthRemaining - 1 ; + } + } + + private int getDepthRemaining() { +// System.out.println("["+toString()+"]'s depth is "+depthToTraverse); + return depthToTraverse; + } + + @SuppressWarnings("unused") + private FileSystemNode(){ + // private default ctor to prevent usage + } + + @Override + public List getChildren() throws IOException { + if (children == null){ + children = new ArrayList(); + if (getDepthRemaining() > 0){ + calculatePathFileStatus(); + if (fstatus.isDir()){ + FileStatus[] fsentries = fs.listStatus(path); + for (FileStatus entry : fsentries){ + if (entry.isDir()){ + // we're not interested in files - they can't be end-nodes or go further, so we skip them + children.add(new FileSystemNode(fs,entry.getPath(),this)); + } + } + } + } + } + return children; + } + + private void calculatePathFileStatus() throws IOException { + if (fstatus == null){ + fstatus = fs.getFileStatus(path); + } + } + + @Override + public CrawlNode getParent() { + return parent; + } + + @Override + public CrawlNode getTreeRoot() { + return root; + } + + public Path getPath(){ + return path; + } + + public URI getURI(){ + return path.toUri(); + } + + @Override + public String toString() { + return getURI().toString(); + } + +} Index: src/java/org/apache/hcatalog/tools/growl/taskbased/CrawlTask.java =================================================================== --- src/java/org/apache/hcatalog/tools/growl/taskbased/CrawlTask.java (revision 0) +++ src/java/org/apache/hcatalog/tools/growl/taskbased/CrawlTask.java (revision 0) @@ -0,0 +1,92 @@ +package org.apache.hcatalog.tools.growl.taskbased; + +import java.util.Collection; + +import org.apache.hcatalog.tools.growl.PartitionProxy; +import org.apache.hcatalog.tools.growl.TableProxy; +import org.apache.hcatalog.tools.growl.UsageException; +import org.apache.hcatalog.tools.growl.MetadataCallback.STRATEGY; + +public abstract class CrawlTask { + + public static final String CRAWL_TASK_ARGS = "CrawlTaskArgs"; + public static final String CRAWL_TASK_DUPLICATE_TABLE_STRATEGY = "CrawlTaskDuplicateTableStrategy"; + public static final String CRAWL_TASK_DUPLICATE_PTN_STRATEGY = "CrawlTaskDuplicatePtnStrategy"; + protected TaskConf taskConf; + + protected STRATEGY defaultPtnStrategy = STRATEGY.ERROR; + protected STRATEGY defaultTableStrategy = STRATEGY.ERROR; + + public CrawlTask() throws Exception{ + if (! System.getProperties().containsKey(CRAWL_TASK_ARGS)){ + throw new UsageException("Unable to instantiate self without argument specifying" + +" which task definition file to load. Please specify -D"+CRAWL_TASK_ARGS+"="); + } + + taskConf = new TaskConf(System.getProperties().getProperty(CRAWL_TASK_ARGS)); + if (System.getProperties().containsKey(CRAWL_TASK_DUPLICATE_PTN_STRATEGY)){ + defaultPtnStrategy = STRATEGY.getStrategyByName( + System.getProperties().getProperty(CRAWL_TASK_DUPLICATE_PTN_STRATEGY) + ); + } + if (System.getProperties().containsKey(CRAWL_TASK_DUPLICATE_TABLE_STRATEGY)){ + defaultTableStrategy = STRATEGY.getStrategyByName( + System.getProperties().getProperty(CRAWL_TASK_DUPLICATE_TABLE_STRATEGY) + ); + } + + } + + /** + * Given an item, determine the appropriate Table definition mapping for the same + */ + public abstract TableProxy getTableProxy(CrawlNode item) throws Exception; + + /** + * Feedback Hook that Growl will use to tell the Task Definition what + * table that exists in the metadata for a given crawlable item + */ + public abstract void supplyTableInfo(CrawlNode item, TableProxy tableInMetadata) throws Exception; + + /** + * Specify what default strategy needs to be used when a duplicate table definition is found. + * @return + */ + public STRATEGY getDuplicateTableStrategy(){ + return defaultTableStrategy; + } + + /** + * Given an item, determine the appropriate partition definition mapping for the same + */ + public abstract PartitionProxy getPartitionProxy(CrawlNode item) throws Exception; + + /** + * Feedback Hook that Growl will use to tell the Task Definition what + * partition that exists in the metadata for a given crawlable item + */ + public abstract void supplyPartitionInfo(CrawlNode item, PartitionProxy ptnInMetadata) throws Exception; + + /** + * Specify what default strategy needs to be used when a duplicate table definition is found. + * @return + */ + public STRATEGY getDuplicatePartitionStrategy(){ + return defaultPtnStrategy; + } + + /** + * Get a list of all root-level CrawlNode items that this task is expected to represent + */ + public abstract Collection getRootItems(); + + /** + * Determine if a given crawlnode is crawlable/should be crawled or not + * @param item + * @return + */ + public boolean isCrawlable(CrawlNode item) throws Exception{ + return true; + } + +} Index: src/java/org/apache/hcatalog/tools/growl/taskbased/StringHCatSchemaMapper.java =================================================================== --- src/java/org/apache/hcatalog/tools/growl/taskbased/StringHCatSchemaMapper.java (revision 0) +++ src/java/org/apache/hcatalog/tools/growl/taskbased/StringHCatSchemaMapper.java (revision 0) @@ -0,0 +1,25 @@ +package org.apache.hcatalog.tools.growl.taskbased; + +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.data.schema.HCatSchemaUtils; + +public class StringHCatSchemaMapper implements HCatSchemaMapper { + + HCatSchema hsc = null; + + @Override + public void init(String params) throws Exception { + hsc = HCatSchemaUtils.getHCatSchema(params); + } + + @Override + public HCatSchema getSchema() throws Exception { + return hsc; + } + + @Override + public HCatSchema getSchema(String locnSuffix) throws Exception { + return hsc; + } + +} Index: src/java/org/apache/hcatalog/tools/growl/taskbased/CrawlNode.java =================================================================== --- src/java/org/apache/hcatalog/tools/growl/taskbased/CrawlNode.java (revision 0) +++ src/java/org/apache/hcatalog/tools/growl/taskbased/CrawlNode.java (revision 0) @@ -0,0 +1,23 @@ +package org.apache.hcatalog.tools.growl.taskbased; + +import java.io.IOException; +import java.util.List; + +public abstract class CrawlNode { + + public abstract List getChildren() throws IOException; + + public abstract CrawlNode getParent(); + + public abstract CrawlNode getTreeRoot(); + + public boolean isLeaf() throws IOException{ + return (getChildren().size() == 0); + } + + public boolean isRoot() throws IOException{ + return (getParent() == null); + } + + +} Index: src/java/org/apache/hcatalog/tools/growl/UsageException.java =================================================================== --- src/java/org/apache/hcatalog/tools/growl/UsageException.java (revision 0) +++ src/java/org/apache/hcatalog/tools/growl/UsageException.java (revision 0) @@ -0,0 +1,15 @@ +package org.apache.hcatalog.tools.growl; + +public class UsageException extends Exception { + + String usage = null; + + private UsageException() {} // preventing default ctor, we want the usage to be specified. + public UsageException(String usage){ + this.usage = usage; + } + + public String getUsage(){ + return usage; + } +} Index: src/java/org/apache/hcatalog/tools/growl/GrowlUtil.java =================================================================== --- src/java/org/apache/hcatalog/tools/growl/GrowlUtil.java (revision 0) +++ src/java/org/apache/hcatalog/tools/growl/GrowlUtil.java (revision 0) @@ -0,0 +1,73 @@ +package org.apache.hcatalog.tools.growl; + +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Map.Entry; + +public class GrowlUtil { + + public static String printList(List list) { + StringBuilder sb = new StringBuilder(); + boolean firstItem = true; + for (Object o : list){ + if (firstItem){ + sb.append(o.toString()); + firstItem = false; + }else{ + sb.append(','); + sb.append(o.toString()); + } + } + return sb.toString(); + } + + public static Object instantiateFromProperties(String propName) throws Exception{ + if (System.getProperties().containsKey(propName)){ + try { + return Class.forName(System.getProperties().getProperty(propName)).newInstance(); + } catch (ClassNotFoundException cnfe){ + throw new Exception( + "Unable to load class specified("+System.getProperties().getProperty(propName)+") by -D" + + propName + "=.class in System properties, class not found." + ,cnfe); + } catch (Exception e) { + if (e instanceof UsageException){ + throw e; + } else { + throw new Exception( + "Unable to instantiate class specified("+System.getProperties().getProperty(propName)+") by -D" + + propName + "=.class in System properties." + ,e); + } + } + }else{ + throw new UsageException( + propName + " impl not specified - please specify -D" + + propName + "=.class in System properties" + ); + } + + } + + public static void dumpSystemProperties() { + Properties props = System.getProperties(); + for ( Entry entry : props.entrySet()){ + System.out.println("fs"+entry.getKey()+"=>"+entry.getValue()); + } + } + + public static String paramPrint(Map parameters) { + StringBuilder sb = new StringBuilder(); + boolean first = true; + for (String key : parameters.keySet()){ + if (!first){ + sb.append(","); + }else{ + first = false; + } + sb.append(key + "=" + parameters.get(key)); + } + return sb.toString(); + } +} Index: src/java/org/apache/hcatalog/tools/growl/TableProxy.java =================================================================== --- src/java/org/apache/hcatalog/tools/growl/TableProxy.java (revision 0) +++ src/java/org/apache/hcatalog/tools/growl/TableProxy.java (revision 0) @@ -0,0 +1,151 @@ +package org.apache.hcatalog.tools.growl; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.data.schema.HCatSchemaUtils; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Order; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.Type; +import org.apache.hadoop.hive.serde.Constants; + +public class TableProxy { + + Table tableUnderConstruction = null; + + public TableProxy(Table table){ +// System.out.println("Provided table"); + this.tableUnderConstruction = table; + } + + public TableProxy(String tableName){ + this(); + this.withName(tableName); + } + + private TableProxy(){ + tableUnderConstruction = new Table(); + this.inDb(null); + tableUnderConstruction.setPartitionKeys(new ArrayList()); + + tableUnderConstruction.setParameters(new HashMap()); + this.withParameter("EXTERNAL", "TRUE"); + + StorageDescriptor sd = new StorageDescriptor(); + sd.setCompressed(false); + sd.setNumBuckets(1); + sd.setParameters(new HashMap()); + sd.setSerdeInfo(new SerDeInfo()); + sd.getSerdeInfo().setName(tableUnderConstruction.getTableName()); + sd.getSerdeInfo().setParameters(new HashMap()); + sd.getSerdeInfo().getParameters().put(Constants.SERIALIZATION_FORMAT, "1"); + sd.getSerdeInfo().setSerializationLib( + org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName()); + sd.setSortCols(new ArrayList()); + + tableUnderConstruction.setSd(sd); + } + + public TableProxy withParameter(String key, String value){ +// System.out.println("Provided param key["+key+"] value["+value+"]"); + tableUnderConstruction.getParameters().put(key, value); + return this; + } + + public TableProxy withInputFormat(String inputFormatClass){ +// System.out.println("Provided inputFormat["+inputFormatClass+"]"); + if ((inputFormatClass == null)||(inputFormatClass.isEmpty())){ + // default to RCFile + inputFormatClass = org.apache.hadoop.hive.ql.io.RCFileInputFormat.class.getName(); + } + tableUnderConstruction.getSd().setInputFormat(inputFormatClass); + return this; + } + + public TableProxy withOutputFormat(String outputFormatClass){ +// System.out.println("Provided outputFormat["+outputFormatClass+"]"); + if ((outputFormatClass == null)||(outputFormatClass.isEmpty())){ + // default to RCFile + outputFormatClass = org.apache.hadoop.hive.ql.io.RCFileOutputFormat.class.getName(); + } + tableUnderConstruction.getSd().setOutputFormat(outputFormatClass); + return this; + } + + public TableProxy withSerde(String serdeClassName){ +// System.out.println("Provided serde ["+serdeClassName+"]"); + if ((serdeClassName == null)||(serdeClassName.isEmpty())){ + // default to lazyserde + // System.out.println("Using default lazy serde"); + serdeClassName = org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName(); + } + tableUnderConstruction.getSd().getSerdeInfo().setSerializationLib(serdeClassName); + return this; + } + + public TableProxy withPartitioningSchema(HCatSchema hschema){ +// System.out.println("Provided partitioning hschema ["+hschema+"]"); + tableUnderConstruction.setPartitionKeys( + HCatSchemaUtils.getFieldSchemas(hschema.getFields()) + ); + return this; + } + + public TableProxy withPartitioningSchema(String hschemaString) throws Exception{ +// System.out.println("Provided partitioning hschemaString ["+hschemaString+"]"); + return this.withPartitioningSchema(HCatSchemaUtils.getHCatSchema(hschemaString)); + } + + public TableProxy withTableSchema(HCatSchema hschema){ +// System.out.println("Provided table hschema ["+hschema+"]"); + + Type typeDefn = new Type(); + typeDefn.setName(tableUnderConstruction.getTableName()); + + List fieldSchemas = HCatSchemaUtils.getFieldSchemas(hschema.getFields()); + typeDefn.setFields(new ArrayList(fieldSchemas.size())); + typeDefn.getFields().addAll(fieldSchemas); + + tableUnderConstruction.getSd().setCols(typeDefn.getFields()); + + return this; + } + + public TableProxy withTableSchema(String hschemaString) throws Exception{ +// System.out.println("Provided table hschemastring ["+hschemaString+"]"); + return this.withTableSchema(HCatSchemaUtils.getHCatSchema(hschemaString)); + } + + public TableProxy inDb(String dbName){ + if (dbName == null){ +// System.out.println("Provided db ["+dbName+"], using ["+MetaStoreUtils.DEFAULT_DATABASE_NAME+"]"); + tableUnderConstruction.setDbName(MetaStoreUtils.DEFAULT_DATABASE_NAME); + }else{ +// System.out.println("Provided db ["+dbName+"]"); + tableUnderConstruction.setDbName(dbName); + } + return this; + } + + public TableProxy withName(String tableName){ +// System.out.println("Provided tableName ["+tableName+"]"); + tableUnderConstruction.setTableName(tableName); + return this; + } + + public Table getTable(){ + return tableUnderConstruction; + } + + public TableProxy withLocation(String locn) { +// System.out.println("Provided locn ["+locn+"]"); + tableUnderConstruction.getSd().setLocation(locn); + return this; + } +} Index: src/java/org/apache/hcatalog/tools/growl/Growl.java =================================================================== --- src/java/org/apache/hcatalog/tools/growl/Growl.java (revision 0) +++ src/java/org/apache/hcatalog/tools/growl/Growl.java (revision 0) @@ -0,0 +1,28 @@ +package org.apache.hcatalog.tools.growl; + +import java.io.PrintStream; + +public class Growl { + + public static final String GROWL_CRAWLER_IMPL = "GrowlCrawlerImpl"; + + public static void main(String args[]) throws Exception{ + try { + Crawler c = (Crawler) GrowlUtil.instantiateFromProperties(GROWL_CRAWLER_IMPL); + MetadataCallback mdc = new MetadataCallback(); + + c.registerMetadataCallback(mdc); + c.crawl(); + } catch (UsageException ue){ + usage(System.err); + System.err.println("\n\nUsage Exception: " + ue.getUsage()); + } + } + + public static void usage(PrintStream ps){ + ps.println("Usage : "); + ps.println(" growl -DGrowlCrawlerImpl="); + ps.println("(An example of a crawler implementing class provided with Growl is " + +"TaskBasedCrawler, and that might require further -D parameters)"); + } +} Index: src/java/org/apache/hcatalog/tools/growl/PartitionProxy.java =================================================================== --- src/java/org/apache/hcatalog/tools/growl/PartitionProxy.java (revision 0) +++ src/java/org/apache/hcatalog/tools/growl/PartitionProxy.java (revision 0) @@ -0,0 +1,189 @@ +package org.apache.hcatalog.tools.growl; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.Type; +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.data.schema.HCatSchemaUtils; + +public class PartitionProxy { + + Partition ptnUnderConstruction = null; + Table parentTable = null; + + boolean parentChanged = false; + + public PartitionProxy(Partition ptn){ + this.ptnUnderConstruction = ptn; + } + + public PartitionProxy(){ + ptnUnderConstruction = new Partition(); + ptnUnderConstruction.setParameters(new HashMap()); + ptnUnderConstruction.setValues(new ArrayList()); + } + + public PartitionProxy inDb(String dbName){ + ptnUnderConstruction.setDbName(dbName); + return this; + } + + public PartitionProxy inTable(String tableName){ + ptnUnderConstruction.setTableName(tableName); + return this; + } + + public PartitionProxy inTable(Table table) throws Exception{ + this.parentTable = table; + parentTableNullCheck(); + this.inDb(parentTable.getDbName()).inTable(parentTable.getTableName()); + initializeStorageDescriptorIfNecessary(); + return this; + } + + public PartitionProxy withPartitionSchema(HCatSchema hschema) throws Exception{ +// System.out.println("Provided partition hschema ["+hschema+"]"); + parentTableNullCheck(); + + Type typeDefn = new Type(); + typeDefn.setName(ptnUnderConstruction.getTableName()); + + List fieldSchemas = HCatSchemaUtils.getFieldSchemas(hschema.getFields()); + typeDefn.setFields(new ArrayList(fieldSchemas.size())); + typeDefn.getFields().addAll(fieldSchemas); + + initializeStorageDescriptorIfNecessary(); + + List modifiedParentCols; + ptnUnderConstruction.getSd().setCols(typeDefn.getFields()); + try { + modifiedParentCols = mergeColumns(typeDefn.getFields(),parentTable.getSd().getCols()); + if (parentChanged){ + parentTable.getSd().setCols(modifiedParentCols); + } + } catch (Exception e){ + throw new Exception("Error merging partition schema ["+hschema+"]",e); + } + + return this; + } + + private List mergeColumns(List ptnColumns, List tblColumns) throws Exception { + Set seenColumns = new HashSet(); + List mergedColumns = new ArrayList(); + + mergeColumns(mergedColumns,tblColumns,0,ptnColumns,0,seenColumns); + return mergedColumns; + } + + private void mergeColumns(List mergedColumns, + List tblSchema, int tblIndex, + List ptnSchema, int ptnIndex, + Set seenColumns) throws Exception { + if (tblIndex < tblSchema.size()){ + mergedColumns.add(tblSchema.get(tblIndex)); + seenColumns.add(tblSchema.get(tblIndex).getName()); + if ((ptnIndex < ptnSchema.size()) + && (tblSchema.get(tblIndex).compareTo(ptnSchema.get(ptnIndex)) == 0)){ + mergeColumns(mergedColumns,tblSchema,tblIndex+1,ptnSchema,ptnIndex+1,seenColumns); + return; + } + mergeColumns(mergedColumns,tblSchema,tblIndex+1,ptnSchema,ptnIndex,seenColumns); + return; + } + while (ptnIndex < ptnSchema.size()){ + // if we're here, then tableSchema is already depleted. + if (seenColumns.contains(ptnSchema.get(ptnIndex))){ + throw new Exception("Partition column order mismatch on merge!"); + } + mergedColumns.add(ptnSchema.get(ptnIndex)); + seenColumns.add(ptnSchema.get(ptnIndex).getName()); + ptnIndex++; + this.parentChanged = true; + } + } + + private void initializeStorageDescriptorIfNecessary() throws Exception { + parentTableNullCheck(); + if (ptnUnderConstruction.getSd() == null){ + ptnUnderConstruction.setSd(parentTable.getSd().deepCopy()); +// ptnUnderConstruction.getSd().setSerdeInfo(parentTable.getSd().getSerdeInfo().deepCopy()); + } + } + + + public PartitionProxy withInputFormat(String inputFormatClass) throws Exception{ + initializeStorageDescriptorIfNecessary(); + if ((inputFormatClass == null)||(inputFormatClass.isEmpty())){ + // default to RCFile + inputFormatClass = org.apache.hadoop.hive.ql.io.RCFileInputFormat.class.getName(); + } + ptnUnderConstruction.getSd().setInputFormat(inputFormatClass); + return this; + } + + public PartitionProxy withSerde(String serdeClassName) throws Exception{ + initializeStorageDescriptorIfNecessary(); + if ((serdeClassName == null)||(serdeClassName.isEmpty())){ + // default to lazyserde + serdeClassName = org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName(); + } + ptnUnderConstruction.getSd().getSerdeInfo().setSerializationLib(serdeClassName); + return this; + } + + public PartitionProxy withLocation(String locn) throws Exception{ + initializeStorageDescriptorIfNecessary(); + ptnUnderConstruction.getSd().setLocation(locn); + return this; + } + + public PartitionProxy withLocationSuffix(String locnSuffix) throws Exception { + parentTableNullCheck(); + ptnUnderConstruction.getSd().setLocation(parentTable.getSd().getLocation() + "/" + locnSuffix); + return this; + } + + public PartitionProxy withValue(String pval){ + ptnUnderConstruction.getValues().add(pval); + return this; + } + + public PartitionProxy withValues(List pvals) { + ptnUnderConstruction.getValues().addAll(pvals); + return this; + } + + public PartitionProxy withParameter(String key, String value){ + ptnUnderConstruction.getParameters().put(key, value); + return this; + } + + + private void parentTableNullCheck() throws Exception { + if (parentTable == null){ + throw new Exception("Parent table provided is null!"); + } + } + + public Partition getPartition(){ + return ptnUnderConstruction; + } + + public boolean parentChanged(){ + return this.parentChanged; + } + + public Table getParentTable(){ + return this.parentTable; + } +} Index: src/java/org/apache/hcatalog/tools/growl/MetadataCallback.java =================================================================== --- src/java/org/apache/hcatalog/tools/growl/MetadataCallback.java (revision 0) +++ src/java/org/apache/hcatalog/tools/growl/MetadataCallback.java (revision 0) @@ -0,0 +1,229 @@ +package org.apache.hcatalog.tools.growl; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.data.schema.HCatSchemaUtils; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.util.StringUtils; +import org.apache.thrift.TException; + +public class MetadataCallback { + + public enum STRATEGY { + DROPANDRECREATE("dropandrecreate"), + ERROR("error"), + IGNORE("ignore"); + + private String strat; + + private STRATEGY(String strategy){ + this.strat = strategy; + } + + public static STRATEGY getStrategyByName(String strategy) { + return STRATEGY.valueOf(strategy.toUpperCase()); + } + + + + } + + private static final String DEBUG_VERBOSE_EVERYTHING_PROPKEY = "DebugVerbosePrintAllMode"; + private static final String DEBUG_VERBOSE_PRINTS_PROPKEY = "DebugVerbosePrintMode"; + + private static boolean DEBUG_VERBOSE_EVERYTHING = false; + private static boolean DEBUG_VERBOSE_PRINTS = false; + + protected HiveMetaStoreClient client; + protected final HiveConf hiveConf; + + public MetadataCallback() throws Exception { + + if(System.getProperties().containsKey(DEBUG_VERBOSE_EVERYTHING_PROPKEY)){ + System.out.println("Debug printing of everything enabled"); + DEBUG_VERBOSE_EVERYTHING = true; + DEBUG_VERBOSE_PRINTS = true; + } + + if(System.getProperties().containsKey(DEBUG_VERBOSE_PRINTS_PROPKEY)){ + System.out.println("Debug printing enabled"); + DEBUG_VERBOSE_PRINTS = true; + } + + hiveConf = new HiveConf(this.getClass()); + // hiveConf.set("hive.metastore.local","true"); + // hiveConf.set("javax.jdo.option.ConnectionDriverName","org.apache.derby.jdbc.EmbeddedDriver"); + // hiveConf.set("javax.jdo.option.ConnectionURL","jdbc:derby:;databaseName=metastore_db;create=true"); + // hiveConf.set("hive.metastore.metadb.dir","file:///var/metastore/metadb/"); + // hiveConf.set("hive.metastore.uris","file:///var/metastore/metadb/"); + // hiveConf.set("hive.metastore.warehouse.dir","/user/hive/warehouse"); + // hiveConf.set("hive.metastore.connect.retries","5"); + // hiveConf.set("hive.metastore.rawstore.impl","org.apache.hadoop.hive.metastore.ObjectStore"); + try { + client = new HiveMetaStoreClient(hiveConf, null); + } catch (Throwable e) { + System.err.println("Unable to open the metastore"); + System.err.println(StringUtils.stringifyException(e)); + throw new Exception(e); + } + } + + public Partition createPartition(PartitionProxy ptnProxy, STRATEGY s) throws Exception { + String operation = ""; + Partition partition = ptnProxy.getPartition(); + Partition partitionInMetadata = null; + try { + partitionInMetadata = client.getPartition(partition.getDbName(), partition.getTableName(), partition.getValues()); + } catch (NoSuchObjectException e){ + // ok, all good, we can create the partition + } + if (partitionInMetadata == null){ + // partition did not already exist. + operation = "Created partition"; + addPartitionProxy(ptnProxy); + } else { + switch (s){ + // case ALTER : + // // implement if relevant + // break; + case DROPANDRECREATE : + operation = "Dropped and recreated partition"; + client.dropPartition(partition.getDbName(), partition.getTableName(), partition.getValues(),false); + // IMPORTANT : DO NOT delete data during the partition drop if the table isn't marked EXTERNAL + addPartitionProxy(ptnProxy); + break; + case IGNORE : + operation = "Partition already exists, ignoring create"; + System.out.println("Ignoring duplicate definition for partition " + + partition.getDbName() + "." + partition.getTableName() + ":" + GrowlUtil.printList(partition.getValues()) + ); + break; + case ERROR : // carry through to default - default should ERROR + default : + throw new Exception( + "Duplicate Partition Definition found for Table : " + + partition.getDbName() +"."+partition.getTableName() + ":" + GrowlUtil.printList(partition.getValues()) + ); + } + } + // tell the task what partition we just created + Partition retval = getPartition(ptnProxy); + debugPrintPartition(operation, retval); + return retval; + } + + public Partition getPartition(PartitionProxy pproxy) throws Exception { + return client.getPartition(pproxy.getPartition().getDbName(), pproxy.getPartition().getTableName(), pproxy.getPartition().getValues()); + } + + public Table getTable(PartitionProxy pproxy) throws Exception { + return client.getTable(pproxy.getPartition().getDbName(), pproxy.getPartition().getTableName()); + } + + public Table getTable(TableProxy tproxy) throws Exception { + return client.getTable(tproxy.getTable().getDbName(), tproxy.getTable().getTableName()); + } + + private void addPartitionProxy(PartitionProxy ptnProxy) throws Exception { + if (ptnProxy.parentChanged()){ + client.alter_table(ptnProxy.getPartition().getDbName(), ptnProxy.getPartition().getTableName(), ptnProxy.getParentTable()); + debugPrintTable( + "Modified parent table for schema evolution", + client.getTable(ptnProxy.getPartition().getDbName(), ptnProxy.getPartition().getTableName()) + ); + } + client.add_partition(ptnProxy.getPartition()); + } + + public Table createTable(TableProxy tblProxy, STRATEGY s) throws Exception { + Table table = tblProxy.getTable(); + Table tableInMetadata = null; + String operation = ""; + try { + tableInMetadata = client.getTable(table.getDbName(), table.getTableName()); + } catch (NoSuchObjectException e) { + // ok, all good, we can create the table + } + if (tableInMetadata == null){ + // table did not already exist. + operation = "Created table"; + client.createTable(table); + } else { + switch (s){ + // case ALTER : + // // implement if relevant + // break; + case DROPANDRECREATE : + operation = "Dropped and recreated table"; + client.dropTable(table.getDbName(), table.getTableName(),false,true); + // DANGER! IMPORTANT! DO NOT DROP DATA from table when deleting if not marked EXTERNAL, especially if the location points to the same place - you'll nuke your data dir + client.createTable(table); + break; + case IGNORE : + operation = "Table already exists, ignoring create"; + System.out.println("Ignoring duplicate definition for table " + + table.getDbName() + "." + table.getTableName()); + break; + case ERROR : // carry through to default - default should ERROR + default : + throw new Exception( + "Duplicate Table Definition found for Table : " + + table.getDbName() +"."+table.getTableName() + ); + } + } + // tell the task what table we just created. + Table retval = getTable(tblProxy); + debugPrintTable(operation,retval); + return retval; + } + + protected void showAllMetadata() throws Exception { + if (DEBUG_VERBOSE_EVERYTHING){ + for (String dbName : client.getAllDatabases()){ + System.out.println("DB:"+dbName+">"); + for (String tableName : client.getAllTables(dbName)){ + Table tbl = client.getTable(dbName, tableName); + debugPrintTable("",tbl); + for (Partition ptn : client.listPartitions(dbName, tableName, Short.MAX_VALUE)){ + debugPrintPartition("",ptn); + } + } + } + } + } + + private void debugPrintTable(String prefix, Table tbl) throws HCatException { + if (DEBUG_VERBOSE_PRINTS){ + System.out.println("\t"+prefix+" "+tbl.getTableName()+">>"); + System.out.println("\t\t:Location:" + tbl.getSd().getLocation()); + System.out.println("\t\t:Schema:"+HCatSchemaUtils.getHCatSchema(tbl.getSd().getCols())); + System.out.println("\t\t:IF:"+tbl.getSd().getInputFormat()); + System.out.println("\t\t:OF:"+tbl.getSd().getOutputFormat()); + System.out.println("\t\t:Serde:"+tbl.getSd().getSerdeInfo()); + System.out.println("\t\t:Parameters:" + GrowlUtil.paramPrint(tbl.getParameters())); + } + } + + private void debugPrintPartition(String prefix, Partition ptn) throws HCatException { + if (DEBUG_VERBOSE_PRINTS){ + System.out.println("\t\t"+prefix+" " + GrowlUtil.printList(ptn.getValues())+">>>"); + System.out.println("\t\t\t:Location:" + ptn.getSd().getLocation()); + System.out.println("\t\t\t:Schema:"+HCatSchemaUtils.getHCatSchema(ptn.getSd().getCols())); + System.out.println("\t\t\t:IF:"+ptn.getSd().getInputFormat()); + System.out.println("\t\t\t:OF:"+ptn.getSd().getOutputFormat()); + System.out.println("\t\t\t:Serde:"+ptn.getSd().getSerdeInfo()); + System.out.println("\t\t\t:Parameters:" + GrowlUtil.paramPrint(ptn.getParameters())); + } + } + + public static void main(String args[]) throws Exception { + (new MetadataCallback()).showAllMetadata(); + } + + +}