diff --git metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java index b28983f..7153adc 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -118,6 +118,10 @@ import org.apache.hadoop.hive.metastore.api.UnknownDBException; import org.apache.hadoop.hive.metastore.api.UnknownPartitionException; import org.apache.hadoop.hive.metastore.api.UnknownTableException; +import org.apache.hadoop.hive.metastore.metatool.BlockRetrieverIterable; +import org.apache.hadoop.hive.metastore.metatool.BlockRetrieverIterator; +import org.apache.hadoop.hive.metastore.metatool.IDataProvider; +import org.apache.hadoop.hive.metastore.metatool.RetrieverIterable; import org.apache.hadoop.hive.metastore.model.MColumnDescriptor; import org.apache.hadoop.hive.metastore.model.MConstraint; import org.apache.hadoop.hive.metastore.model.MDBPrivilege; @@ -6378,405 +6382,53 @@ public long executeJDOQLUpdate(String queryStr) { } } - private boolean shouldUpdateURI(URI onDiskUri, URI inputUri) { - String onDiskHost = onDiskUri.getHost(); - String inputHost = inputUri.getHost(); - - int onDiskPort = onDiskUri.getPort(); - int inputPort = inputUri.getPort(); - - String onDiskScheme = onDiskUri.getScheme(); - String inputScheme = inputUri.getScheme(); - - //compare ports - if (inputPort != -1) { - if (inputPort != onDiskPort) { - return false; - } - } - //compare schemes - if (inputScheme != null) { - if (onDiskScheme == null) { - return false; - } - if (!inputScheme.equalsIgnoreCase(onDiskScheme)) { - return false; - } - } - //compare hosts - if (onDiskHost != null) { - if (!inputHost.equalsIgnoreCase(onDiskHost)) { - return false; - } - } else { - return false; - } - return true; - } - - public class UpdateMDatabaseURIRetVal { - private List badRecords; - private Map updateLocations; - - UpdateMDatabaseURIRetVal(List badRecords, Map updateLocations) { - this.badRecords = badRecords; - this.updateLocations = updateLocations; - } - - public List getBadRecords() { - return badRecords; - } - - public void setBadRecords(List badRecords) { - this.badRecords = badRecords; - } - - public Map getUpdateLocations() { - return updateLocations; - } - - public void setUpdateLocations(Map updateLocations) { - this.updateLocations = updateLocations; - } - } - - /** The following APIs - * - * - updateMDatabaseURI - * - * is used by HiveMetaTool. This API **shouldn't** be exposed via Thrift. - * - */ - public UpdateMDatabaseURIRetVal updateMDatabaseURI(URI oldLoc, URI newLoc, boolean dryRun) { - boolean committed = false; - Query query = null; - Map updateLocations = new HashMap(); - List badRecords = new ArrayList(); - UpdateMDatabaseURIRetVal retVal = null; - try { - openTransaction(); - query = pm.newQuery(MDatabase.class); - List mDBs = (List) query.execute(); - pm.retrieveAll(mDBs); - - for (MDatabase mDB : mDBs) { - URI locationURI = null; - String location = mDB.getLocationUri(); - try { - locationURI = new Path(location).toUri(); - } catch (IllegalArgumentException e) { - badRecords.add(location); - } - if (locationURI == null) { - badRecords.add(location); - } else { - if (shouldUpdateURI(locationURI, oldLoc)) { - String dbLoc = mDB.getLocationUri().replaceAll(oldLoc.toString(), newLoc.toString()); - updateLocations.put(locationURI.toString(), dbLoc); - if (!dryRun) { - mDB.setLocationUri(dbLoc); - } - } - } - } - committed = commitTransaction(); - if (committed) { - retVal = new UpdateMDatabaseURIRetVal(badRecords, updateLocations); - } - return retVal; - } finally { - rollbackAndCleanup(committed, query); - } - } - - public class UpdatePropURIRetVal { - private List badRecords; - private Map updateLocations; - - UpdatePropURIRetVal(List badRecords, Map updateLocations) { - this.badRecords = badRecords; - this.updateLocations = updateLocations; - } + /** + * The following used by HiveMetaTool. This API **shouldn't** be exposed via Thrift. + */ + private static class DataProvider implements IDataProvider { + private final ObjectStore objectStore; + private final Class clazz; - public List getBadRecords() { - return badRecords; + public DataProvider(ObjectStore objectStore, Class clazz) { + this.objectStore = objectStore; + this.clazz = clazz; } - public void setBadRecords(List badRecords) { - this.badRecords = badRecords; + @Override + public boolean commitTransaction() { + return objectStore.commitTransaction(); } - public Map getUpdateLocations() { - return updateLocations; + @Override + public void openTransaction() { + objectStore.openTransaction(); } - public void setUpdateLocations(Map updateLocations) { - this.updateLocations = updateLocations; + @Override + public void rollbackTransaction() { + objectStore.rollbackTransaction(); } - } - private void updatePropURIHelper(URI oldLoc, URI newLoc, String tblPropKey, boolean isDryRun, - List badRecords, Map updateLocations, - Map parameters) { - URI tablePropLocationURI = null; - if (parameters.containsKey(tblPropKey)) { - String tablePropLocation = parameters.get(tblPropKey); - try { - tablePropLocationURI = new Path(tablePropLocation).toUri(); - } catch (IllegalArgumentException e) { - badRecords.add(tablePropLocation); - } - // if tablePropKey that was passed in lead to a valid URI resolution, update it if - //parts of it match the old-NN-loc, else add to badRecords - if (tablePropLocationURI == null) { - badRecords.add(tablePropLocation); - } else { - if (shouldUpdateURI(tablePropLocationURI, oldLoc)) { - String tblPropLoc = parameters.get(tblPropKey).replaceAll(oldLoc.toString(), newLoc - .toString()); - updateLocations.put(tablePropLocationURI.toString(), tblPropLoc); - if (!isDryRun) { - parameters.put(tblPropKey, tblPropLoc); - } - } - } + @Override + public List getEntities(int from, int to) { + Query query = objectStore.pm.newQuery(clazz); + System.out.println("DEBUG range: from=" + from + " to=" + to + "-1"); + query.setRange(from, to); + List entities = (List) query.execute(); + objectStore.pm.retrieveAll(entities); + return entities; } } /** The following APIs * - * - updateMStorageDescriptorTblPropURI + * - getBlockRetriever * * is used by HiveMetaTool. This API **shouldn't** be exposed via Thrift. * */ - public UpdatePropURIRetVal updateTblPropURI(URI oldLoc, URI newLoc, String tblPropKey, - boolean isDryRun) { - boolean committed = false; - Query query = null; - Map updateLocations = new HashMap<>(); - List badRecords = new ArrayList<>(); - UpdatePropURIRetVal retVal = null; - try { - openTransaction(); - query = pm.newQuery(MTable.class); - List mTbls = (List) query.execute(); - pm.retrieveAll(mTbls); - - for (MTable mTbl : mTbls) { - updatePropURIHelper(oldLoc, newLoc, tblPropKey, isDryRun, badRecords, updateLocations, - mTbl.getParameters()); - } - committed = commitTransaction(); - if (committed) { - retVal = new UpdatePropURIRetVal(badRecords, updateLocations); - } - return retVal; - } finally { - rollbackAndCleanup(committed, query); - } - } - - /** The following APIs - * - * - updateMStorageDescriptorTblPropURI - * - * is used by HiveMetaTool. This API **shouldn't** be exposed via Thrift. - * - */ - @Deprecated - public UpdatePropURIRetVal updateMStorageDescriptorTblPropURI(URI oldLoc, URI newLoc, - String tblPropKey, boolean isDryRun) { - boolean committed = false; - Query query = null; - Map updateLocations = new HashMap(); - List badRecords = new ArrayList(); - UpdatePropURIRetVal retVal = null; - try { - openTransaction(); - query = pm.newQuery(MStorageDescriptor.class); - List mSDSs = (List) query.execute(); - pm.retrieveAll(mSDSs); - for (MStorageDescriptor mSDS : mSDSs) { - updatePropURIHelper(oldLoc, newLoc, tblPropKey, isDryRun, badRecords, updateLocations, - mSDS.getParameters()); - } - committed = commitTransaction(); - if (committed) { - retVal = new UpdatePropURIRetVal(badRecords, updateLocations); - } - return retVal; - } finally { - rollbackAndCleanup(committed, query); - } - } - - public class UpdateMStorageDescriptorTblURIRetVal { - private List badRecords; - private Map updateLocations; - private int numNullRecords; - - UpdateMStorageDescriptorTblURIRetVal(List badRecords, - Map updateLocations, int numNullRecords) { - this.badRecords = badRecords; - this.updateLocations = updateLocations; - this.numNullRecords = numNullRecords; - } - - public List getBadRecords() { - return badRecords; - } - - public void setBadRecords(List badRecords) { - this.badRecords = badRecords; - } - - public Map getUpdateLocations() { - return updateLocations; - } - - public void setUpdateLocations(Map updateLocations) { - this.updateLocations = updateLocations; - } - - public int getNumNullRecords() { - return numNullRecords; - } - - public void setNumNullRecords(int numNullRecords) { - this.numNullRecords = numNullRecords; - } - } - - /** The following APIs - * - * - updateMStorageDescriptorTblURI - * - * is used by HiveMetaTool. This API **shouldn't** be exposed via Thrift. - * - */ - public UpdateMStorageDescriptorTblURIRetVal updateMStorageDescriptorTblURI(URI oldLoc, - URI newLoc, boolean isDryRun) { - boolean committed = false; - Query query = null; - Map updateLocations = new HashMap(); - List badRecords = new ArrayList(); - int numNullRecords = 0; - UpdateMStorageDescriptorTblURIRetVal retVal = null; - try { - openTransaction(); - query = pm.newQuery(MStorageDescriptor.class); - List mSDSs = (List) query.execute(); - pm.retrieveAll(mSDSs); - for (MStorageDescriptor mSDS : mSDSs) { - URI locationURI = null; - String location = mSDS.getLocation(); - if (location == null) { // This can happen for View or Index - numNullRecords++; - continue; - } - try { - locationURI = new Path(location).toUri(); - } catch (IllegalArgumentException e) { - badRecords.add(location); - } - if (locationURI == null) { - badRecords.add(location); - } else { - if (shouldUpdateURI(locationURI, oldLoc)) { - String tblLoc = mSDS.getLocation().replaceAll(oldLoc.toString(), newLoc.toString()); - updateLocations.put(locationURI.toString(), tblLoc); - if (!isDryRun) { - mSDS.setLocation(tblLoc); - } - } - } - } - committed = commitTransaction(); - if (committed) { - retVal = new UpdateMStorageDescriptorTblURIRetVal(badRecords, updateLocations, numNullRecords); - } - return retVal; - } finally { - rollbackAndCleanup(committed, query); - } - } - - public class UpdateSerdeURIRetVal { - private List badRecords; - private Map updateLocations; - - UpdateSerdeURIRetVal(List badRecords, Map updateLocations) { - this.badRecords = badRecords; - this.updateLocations = updateLocations; - } - - public List getBadRecords() { - return badRecords; - } - - public void setBadRecords(List badRecords) { - this.badRecords = badRecords; - } - - public Map getUpdateLocations() { - return updateLocations; - } - - public void setUpdateLocations(Map updateLocations) { - this.updateLocations = updateLocations; - } - } - - /** The following APIs - * - * - updateSerdeURI - * - * is used by HiveMetaTool. This API **shouldn't** be exposed via Thrift. - * - */ - public UpdateSerdeURIRetVal updateSerdeURI(URI oldLoc, URI newLoc, String serdeProp, - boolean isDryRun) { - boolean committed = false; - Query query = null; - Map updateLocations = new HashMap(); - List badRecords = new ArrayList(); - UpdateSerdeURIRetVal retVal = null; - try { - openTransaction(); - query = pm.newQuery(MSerDeInfo.class); - List mSerdes = (List) query.execute(); - pm.retrieveAll(mSerdes); - for (MSerDeInfo mSerde : mSerdes) { - if (mSerde.getParameters().containsKey(serdeProp)) { - String schemaLoc = mSerde.getParameters().get(serdeProp); - URI schemaLocURI = null; - try { - schemaLocURI = new Path(schemaLoc).toUri(); - } catch (IllegalArgumentException e) { - badRecords.add(schemaLoc); - } - if (schemaLocURI == null) { - badRecords.add(schemaLoc); - } else { - if (shouldUpdateURI(schemaLocURI, oldLoc)) { - String newSchemaLoc = schemaLoc.replaceAll(oldLoc.toString(), newLoc.toString()); - updateLocations.put(schemaLocURI.toString(), newSchemaLoc); - if (!isDryRun) { - mSerde.getParameters().put(serdeProp, newSchemaLoc); - } - } - } - } - } - committed = commitTransaction(); - if (committed) { - retVal = new UpdateSerdeURIRetVal(badRecords, updateLocations); - } - return retVal; - } finally { - rollbackAndCleanup(committed, query); - } + public RetrieverIterable getBlockRetriever(Class clazz, int blockSize) { + return new BlockRetrieverIterable<>(new BlockRetrieverIterator(new DataProvider<>(this, clazz), blockSize)); } private void writeMTableColumnStatistics(Table table, MTableColumnStatistics mStatsObj, diff --git metastore/src/java/org/apache/hadoop/hive/metastore/metatool/BlockRetrieverIterable.java metastore/src/java/org/apache/hadoop/hive/metastore/metatool/BlockRetrieverIterable.java new file mode 100644 index 0000000..a539fff --- /dev/null +++ metastore/src/java/org/apache/hadoop/hive/metastore/metatool/BlockRetrieverIterable.java @@ -0,0 +1,20 @@ +package org.apache.hadoop.hive.metastore.metatool; + +import java.util.Iterator; + +public class BlockRetrieverIterable implements RetrieverIterable { + private final BlockRetrieverIterator it; + + public BlockRetrieverIterable(BlockRetrieverIterator it) { + this.it = it; + } + + @Override + public Iterator iterator() { + return it; + } + + public boolean isCommited() { + return it.isCommited(); + } +} diff --git metastore/src/java/org/apache/hadoop/hive/metastore/metatool/BlockRetrieverIterator.java metastore/src/java/org/apache/hadoop/hive/metastore/metatool/BlockRetrieverIterator.java new file mode 100644 index 0000000..f76148d --- /dev/null +++ metastore/src/java/org/apache/hadoop/hive/metastore/metatool/BlockRetrieverIterator.java @@ -0,0 +1,70 @@ +package org.apache.hadoop.hive.metastore.metatool; + +import java.util.Iterator; +import java.util.List; + +public class BlockRetrieverIterator implements Iterator { + private final int blockSize; + private final IDataProvider dataProvider; + private int c; + private int actualIndex; + private boolean error = false; + private boolean committed = false; + private boolean isFirst = true; + List mSDSs; + + public BlockRetrieverIterator(IDataProvider dataProvider, int blockSize) { + this.dataProvider = dataProvider; + this.blockSize = blockSize; + } + + private void init() { + dataProvider.openTransaction(); + mSDSs = dataProvider.getEntities(c, c + blockSize); + } + + public boolean hasNext() { + if (isFirst) { + init(); + isFirst = false; + } + boolean hasNext = !error && actualIndex < mSDSs.size(); + if (!hasNext) { + committed = dataProvider.commitTransaction(); + } + return hasNext; + } + + public T next() { + T nextValue = mSDSs.get(actualIndex); + ++c; + committed = false; + if (actualIndex == mSDSs.size() - 1) { + committed = dataProvider.commitTransaction(); + if (!committed) { + dataProvider.rollbackTransaction(); + error = true; + return nextValue; + } + dataProvider.openTransaction(); + mSDSs = dataProvider.getEntities(c, c + blockSize); + actualIndex = 0; + if (mSDSs.size() == 0) { + System.out.println("Progress: " + c + " records were scanned."); + return nextValue; + } + System.out.println("Progress: " + c + " records were scanned."); + } else { + ++actualIndex; + } + return nextValue; + } + + public void remove() { + throw new UnsupportedOperationException(); + } + + public boolean isCommited() { + return committed; + } +} diff --git metastore/src/java/org/apache/hadoop/hive/metastore/metatool/EntityUpdater.java metastore/src/java/org/apache/hadoop/hive/metastore/metatool/EntityUpdater.java new file mode 100644 index 0000000..60e8d51 --- /dev/null +++ metastore/src/java/org/apache/hadoop/hive/metastore/metatool/EntityUpdater.java @@ -0,0 +1,29 @@ +package org.apache.hadoop.hive.metastore.metatool; + +import org.apache.hadoop.hive.metastore.ObjectStore; + +public class EntityUpdater { + + private final ObjectStore objectStore; + private final int blockSize; + + public EntityUpdater(ObjectStore objectStore, int blockSize) { + this.objectStore = objectStore; + this.blockSize = blockSize; + } + + public void update(LocationUpdater locationUpdater, Class clazz) { + RetrieverIterable entities = objectStore.getBlockRetriever(clazz, blockSize); + boolean reachedEnd = false; + try { + for (T entity : entities) { + locationUpdater.process(entity); + } + reachedEnd = true; + } finally { + if (!reachedEnd || !entities.isCommited()) { + locationUpdater.errorHappened(); + } + } + } +} diff --git metastore/src/java/org/apache/hadoop/hive/metastore/metatool/IDataProvider.java metastore/src/java/org/apache/hadoop/hive/metastore/metatool/IDataProvider.java new file mode 100644 index 0000000..db8f113 --- /dev/null +++ metastore/src/java/org/apache/hadoop/hive/metastore/metatool/IDataProvider.java @@ -0,0 +1,13 @@ +package org.apache.hadoop.hive.metastore.metatool; + +import java.util.List; + +public interface IDataProvider { + boolean commitTransaction(); + + void openTransaction(); + + void rollbackTransaction(); + + List getEntities(int from, int to); +} diff --git metastore/src/java/org/apache/hadoop/hive/metastore/metatool/LocationEntity.java metastore/src/java/org/apache/hadoop/hive/metastore/metatool/LocationEntity.java new file mode 100644 index 0000000..c547e57 --- /dev/null +++ metastore/src/java/org/apache/hadoop/hive/metastore/metatool/LocationEntity.java @@ -0,0 +1,9 @@ +package org.apache.hadoop.hive.metastore.metatool; + +public interface LocationEntity { + void setEntity(T t); + + String getLocation(); + + void setLocation(String location); +} diff --git metastore/src/java/org/apache/hadoop/hive/metastore/metatool/LocationEntityImplementations.java metastore/src/java/org/apache/hadoop/hive/metastore/metatool/LocationEntityImplementations.java new file mode 100644 index 0000000..f6e32f6 --- /dev/null +++ metastore/src/java/org/apache/hadoop/hive/metastore/metatool/LocationEntityImplementations.java @@ -0,0 +1,149 @@ +package org.apache.hadoop.hive.metastore.metatool; + +import org.apache.derby.impl.sql.catalog.SYSCOLUMNSRowFactory; +import org.apache.hadoop.hive.metastore.model.MDatabase; +import org.apache.hadoop.hive.metastore.model.MSerDeInfo; +import org.apache.hadoop.hive.metastore.model.MStorageDescriptor; +import org.apache.hadoop.hive.metastore.model.MTable; + +import java.util.HashMap; +import java.util.Map; + +public class LocationEntityImplementations { + public static class MDatabaseEntity implements LocationEntity { + private MDatabase mDatabase; + + @Override + public void setEntity(MDatabase mDatabase) { + this.mDatabase = mDatabase; + } + + @Override + public String getLocation() { + return mDatabase.getLocationUri(); + } + + @Override + public void setLocation(String location) { + mDatabase.setLocationUri(location); + } + } + + public static class MTableEntity implements LocationEntity { + private final String tablePropKey; + private Map parameters; + private MTable mTable; + + public MTableEntity(String tablePropKey) { + this.tablePropKey = tablePropKey; + } + + @Override + public void setEntity(MTable mTable) { + this.mTable = mTable; + this.parameters = mTable.getParameters(); + } + + @Override + public String getLocation() { + if (parameters.containsKey(tablePropKey)) { + return parameters.get(tablePropKey); + } else { + return null; + } + } + + @Override + public void setLocation(String location) { + Map newParams = new HashMap<>(); + newParams.putAll(parameters); + newParams.put(tablePropKey, location); + mTable.setParameters(newParams); + } + } + + public static class MStorageDescriptorEntity implements LocationEntity { + private MStorageDescriptor mStorageDescriptor; + + @Override + public void setEntity(MStorageDescriptor mStorageDescriptor) { + this.mStorageDescriptor = mStorageDescriptor; + } + + @Override + public String getLocation() { + return mStorageDescriptor.getLocation(); + } + + @Override + public void setLocation(String location) { + mStorageDescriptor.setLocation(location); + } + } + + public static class MStorageDescriptorPropEntity implements LocationEntity { + private final String tablePropKey; + private Map parameters; + private MStorageDescriptor mStorageDescriptor; + + public MStorageDescriptorPropEntity(String tablePropKey) { + this.tablePropKey = tablePropKey; + } + + @Override + public void setEntity(MStorageDescriptor mStorageDescriptor) { + this.mStorageDescriptor = mStorageDescriptor; + this.parameters = mStorageDescriptor.getParameters(); + } + + @Override + public String getLocation() { + if (parameters.containsKey(tablePropKey)) { + return parameters.get(tablePropKey); + } else { + return null; + } + } + + @Override + public void setLocation(String location) { + Map newParams = new HashMap<>(); + newParams.putAll(parameters); + newParams.put(tablePropKey, location); + mStorageDescriptor.setParameters(newParams); + } + } + + public static class MSerDeInfoEntity implements LocationEntity { + private final String serdeProp; + private Map parameters; + private MSerDeInfo mSerDeInfo; + + public MSerDeInfoEntity(String serdeProp) { + this.serdeProp = serdeProp; + } + + @Override + public void setEntity(MSerDeInfo mSerde) { + this.mSerDeInfo = mSerde; + this.parameters = mSerde.getParameters(); + } + + @Override + public String getLocation() { + if (parameters.containsKey(serdeProp)) { + return parameters.get(serdeProp); + } else { + return null; + } + } + + @Override + public void setLocation(String location) { + Map newParams = new HashMap<>(); + newParams.putAll(parameters); + newParams.put(serdeProp, location); + mSerDeInfo.setParameters(newParams); + } + } +} diff --git metastore/src/java/org/apache/hadoop/hive/metastore/metatool/LocationUpdater.java metastore/src/java/org/apache/hadoop/hive/metastore/metatool/LocationUpdater.java new file mode 100644 index 0000000..7e1d3ff --- /dev/null +++ metastore/src/java/org/apache/hadoop/hive/metastore/metatool/LocationUpdater.java @@ -0,0 +1,94 @@ +package org.apache.hadoop.hive.metastore.metatool; + +import org.apache.hadoop.fs.Path; + +import java.net.URI; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class LocationUpdater { + private List badRecords = new ArrayList(); + private Map updateLocations = new HashMap(); + + private final URI oldLoc; + private final URI newLoc; + private final boolean isDryRun; + private final boolean isSilentMode; + private final UriUpdateChecker updateChecker; + private final LocationEntity locationEntity; + private int numNullRecords; + private int numBadRecords; + private int numUpdatedRecords; + private boolean wasError = false; + + public LocationUpdater(UpdateParams updateParams, LocationEntity locationEntity) { + this.oldLoc = updateParams.oldLoc; + this.newLoc = updateParams.newLoc; + this.isDryRun = updateParams.isDryRun; + this.updateChecker = updateParams.updateChecker; + this.isSilentMode = updateParams.isSilentMode; + this.locationEntity = locationEntity; + } + + public void process(T entity) { + locationEntity.setEntity(entity); + String location = locationEntity.getLocation(); + if (location == null) { // This can happen for View or Index + incNumNullRecords(); + return; + } + URI locationURI = null; + try { + locationURI = new Path(location).toUri(); + } catch (IllegalArgumentException e) { + } + if (locationURI == null) { + addBadRecord(location); + } else { + if (updateChecker.shouldUpdateURI(locationURI, oldLoc)) { + String newLocation = location.replaceAll(oldLoc.toString(), newLoc.toString()); + addUpdateLocation(location, newLocation); + if (!isDryRun) { + locationEntity.setLocation(newLocation); + } + } + } + } + + public void addBadRecord(String badRecord) { + if (!isSilentMode) { + this.badRecords.add(badRecord); + } + ++numBadRecords; + } + + public Map getUpdateLocations() { + return updateLocations; + } + + public void addUpdateLocation(String oldLocation, String newLocation) { + if (!isSilentMode) { + this.updateLocations.put(oldLocation, newLocation); + } + ++numUpdatedRecords; + } + + public void incNumNullRecords() { + ++this.numNullRecords; + } + + public ReturnValue getRetVal() { + return new ReturnValue(badRecords, updateLocations, numNullRecords, + numUpdatedRecords, numBadRecords); + } + + public boolean wasError() { + return wasError; + } + + public void errorHappened() { + wasError = true; + } +} diff --git metastore/src/java/org/apache/hadoop/hive/metastore/metatool/RetrieverIterable.java metastore/src/java/org/apache/hadoop/hive/metastore/metatool/RetrieverIterable.java new file mode 100644 index 0000000..6d8c09d --- /dev/null +++ metastore/src/java/org/apache/hadoop/hive/metastore/metatool/RetrieverIterable.java @@ -0,0 +1,5 @@ +package org.apache.hadoop.hive.metastore.metatool; + +public interface RetrieverIterable extends Iterable { + boolean isCommited(); +} diff --git metastore/src/java/org/apache/hadoop/hive/metastore/metatool/ReturnValue.java metastore/src/java/org/apache/hadoop/hive/metastore/metatool/ReturnValue.java new file mode 100644 index 0000000..c209d39 --- /dev/null +++ metastore/src/java/org/apache/hadoop/hive/metastore/metatool/ReturnValue.java @@ -0,0 +1,43 @@ +package org.apache.hadoop.hive.metastore.metatool; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ReturnValue { + private List badRecords = new ArrayList(); + private Map updateLocations = new HashMap(); + private int numNullRecords; + private int numUpdatedRecords; + private int numBadRecords; + + public ReturnValue(List badRecords, Map updateLocations, int numNullRecords, + int numUpdatedRecords, int numBadRecords) { + this.badRecords = badRecords; + this.updateLocations = updateLocations; + this.numNullRecords = numNullRecords; + this.numUpdatedRecords = numUpdatedRecords; + this.numBadRecords = numBadRecords; + } + + public List getBadRecords() { + return badRecords; + } + + public Map getUpdateLocations() { + return updateLocations; + } + + public int getNumNullRecords() { + return numNullRecords; + } + + public int getNumBadRecords() { + return numBadRecords; + } + + public int getNumUpdatedRecords() { + return numUpdatedRecords; + } +} diff --git metastore/src/java/org/apache/hadoop/hive/metastore/metatool/UpdateParams.java metastore/src/java/org/apache/hadoop/hive/metastore/metatool/UpdateParams.java new file mode 100644 index 0000000..96da3e7 --- /dev/null +++ metastore/src/java/org/apache/hadoop/hive/metastore/metatool/UpdateParams.java @@ -0,0 +1,20 @@ +package org.apache.hadoop.hive.metastore.metatool; + +import java.net.URI; + +public class UpdateParams { + public final boolean isSilentMode; + public final URI oldLoc; + public final URI newLoc; + public final boolean isDryRun; + public final UriUpdateChecker updateChecker; + + public UpdateParams(UriUpdateChecker updateChecker, boolean isSilentMode, URI oldLoc, URI newLoc, + boolean isDryRun) { + this.oldLoc = oldLoc; + this.newLoc = newLoc; + this.isDryRun = isDryRun; + this.updateChecker = updateChecker; + this.isSilentMode = isSilentMode; + } +} diff --git metastore/src/java/org/apache/hadoop/hive/metastore/metatool/UriUpdateChecker.java metastore/src/java/org/apache/hadoop/hive/metastore/metatool/UriUpdateChecker.java new file mode 100644 index 0000000..e827a77 --- /dev/null +++ metastore/src/java/org/apache/hadoop/hive/metastore/metatool/UriUpdateChecker.java @@ -0,0 +1,41 @@ +package org.apache.hadoop.hive.metastore.metatool; + +import java.net.URI; + +public class UriUpdateChecker { + public boolean shouldUpdateURI(URI onDiskUri, URI inputUri) { + String onDiskHost = onDiskUri.getHost(); + String inputHost = inputUri.getHost(); + + int onDiskPort = onDiskUri.getPort(); + int inputPort = inputUri.getPort(); + + String onDiskScheme = onDiskUri.getScheme(); + String inputScheme = inputUri.getScheme(); + + //compare ports + if (inputPort != -1) { + if (inputPort != onDiskPort) { + return false; + } + } + //compare schemes + if (inputScheme != null) { + if (onDiskScheme == null) { + return false; + } + if (!inputScheme.equalsIgnoreCase(onDiskScheme)) { + return false; + } + } + //compare hosts + if (onDiskHost != null) { + if (!inputHost.equalsIgnoreCase(onDiskHost)) { + return false; + } + } else { + return false; + } + return true; + } +} diff --git metastore/src/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java metastore/src/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java index 22e246f..90e352e 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,7 +21,6 @@ import java.net.URI; import java.util.Collection; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Set; @@ -38,6 +37,11 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.ObjectStore; +import org.apache.hadoop.hive.metastore.metatool.*; +import org.apache.hadoop.hive.metastore.model.MDatabase; +import org.apache.hadoop.hive.metastore.model.MSerDeInfo; +import org.apache.hadoop.hive.metastore.model.MStorageDescriptor; +import org.apache.hadoop.hive.metastore.model.MTable; /** * This class provides Hive admins a tool to @@ -51,11 +55,20 @@ private final Options cmdLineOptions = new Options(); private ObjectStore objStore; private boolean isObjStoreInitialized; + private final static int DEFAULT_BLOCK_SIZE = 1000; + private int blockSize = DEFAULT_BLOCK_SIZE; public HiveMetaTool() { this.isObjStoreInitialized = false; } + // Use only for for testing + void setObjectStore(ObjectStore objectStore, int blockSize) { + this.objStore = objectStore; + this.isObjStoreInitialized = true; + this.blockSize = blockSize; + } + @SuppressWarnings("static-access") private void init() { @@ -89,6 +102,8 @@ private void init() { Option dryRun = new Option("dryRun" , "Perform a dry run of updateLocation changes.When " + "run with the dryRun option updateLocation changes are displayed but not persisted. " + "dryRun is valid only with the updateLocation option."); + Option silentMode = new Option("silentMode", "Does not collect the changed records/bad record, only the " + + "number of changed records and bad records. If you have many tables use this"); Option serdePropKey = OptionBuilder.withArgName("serde-prop-key") .hasArgs() @@ -109,6 +124,7 @@ private void init() { cmdLineOptions.addOption(executeJDOQL); cmdLineOptions.addOption(updateFSRootLoc); cmdLineOptions.addOption(dryRun); + cmdLineOptions.addOption(silentMode); cmdLineOptions.addOption(serdePropKey); cmdLineOptions.addOption(tablePropKey); } @@ -192,133 +208,27 @@ private int printUpdateLocations(Map updateLocations) { return count; } - private void printTblURIUpdateSummary(ObjectStore.UpdateMStorageDescriptorTblURIRetVal retVal, - boolean isDryRun) { - String tblName = new String("SDS"); - String fieldName = new String("LOCATION"); - - if (retVal == null) { - System.err.println("Encountered error while executing updateMStorageDescriptorTblURI - " + - "commit of JDO transaction failed. Failed to update FSRoot locations in " + - fieldName + "field in " + tblName + " table."); - } else { - Map updateLocations = retVal.getUpdateLocations(); - if (isDryRun) { - System.out.println("Dry Run of updateLocation on table " + tblName + ".."); - } else { - System.out.println("Successfully updated the following locations.."); - } - int count = printUpdateLocations(updateLocations); - if (isDryRun) { - System.out.println("Found " + count + " records in " + tblName + " table to update"); - } else { - System.out.println("Updated " + count + " records in " + tblName + " table"); - } - List badRecords = retVal.getBadRecords(); - if (badRecords.size() > 0) { - System.err.println("Warning: Found records with bad " + fieldName + " in " + - tblName + " table.. "); - for (String badRecord:badRecords) { - System.err.println("bad location URI: " + badRecord); - } - } - int numNullRecords = retVal.getNumNullRecords(); - if (numNullRecords != 0) { - LOG.debug("Number of NULL location URI: " + numNullRecords + - ". This can happen for View or Index."); - } + private void printResults(LocationUpdater updater, boolean isDryRun, boolean isSilentMode) { + if (updater.wasError()) { + System.err.println("Encountered ERROR while executing, JDO transaction failed." + + "The data might not be consistent."); } - } - - private void printDatabaseURIUpdateSummary(ObjectStore.UpdateMDatabaseURIRetVal retVal, - boolean isDryRun) { - String tblName = new String("DBS"); - String fieldName = new String("LOCATION_URI"); - - if (retVal == null) { - System.err.println("Encountered error while executing updateMDatabaseURI - " + - "commit of JDO transaction failed. Failed to update FSRoot locations in " + - fieldName + "field in " + tblName + " table."); + ReturnValue retVal = updater.getRetVal(); + int count = retVal.getNumUpdatedRecords(); + if (isDryRun) { + System.out.println("Dry Run. No location was updated. Found " + count + " records to update"); } else { - Map updateLocations = retVal.getUpdateLocations(); - if (isDryRun) { - System.out.println("Dry Run of updateLocation on table " + tblName + ".."); - } else { - System.out.println("Successfully updated the following locations.."); - } - int count = printUpdateLocations(updateLocations); - if (isDryRun) { - System.out.println("Found " + count + " records in " + tblName + " table to update"); - } else { - System.out.println("Updated " + count + " records in " + tblName + " table"); - } - List badRecords = retVal.getBadRecords(); - if (badRecords.size() > 0) { - System.err.println("Warning: Found records with bad " + fieldName + " in " + - tblName + " table.. "); - for (String badRecord:badRecords) { - System.err.println("bad location URI: " + badRecord); - } - } + System.out.println("Successfully updated " + count + " records."); } - } - - private void printPropURIUpdateSummary(ObjectStore.UpdatePropURIRetVal retVal, String - tablePropKey, boolean isDryRun, String tblName, String methodName) { - if (retVal == null) { - System.err.println("Encountered error while executing " + methodName + " - " + - "commit of JDO transaction failed. Failed to update FSRoot locations in " + - "value field corresponding to" + tablePropKey + " in " + tblName + " table."); - } else { - Map updateLocations = retVal.getUpdateLocations(); - if (isDryRun) { - System.out.println("Dry Run of updateLocation on table " + tblName + ".."); - } else { - System.out.println("Successfully updated the following locations.."); - } - int count = printUpdateLocations(updateLocations); - if (isDryRun) { - System.out.println("Found " + count + " records in " + tblName + " table to update"); - } else { - System.out.println("Updated " + count + " records in " + tblName + " table"); - } - List badRecords = retVal.getBadRecords(); - if (badRecords.size() > 0) { - System.err.println("Warning: Found records with bad " + tablePropKey + " key in " + - tblName + " table.. "); - for (String badRecord:badRecords) { - System.err.println("bad location URI: " + badRecord); - } + if (!isSilentMode) { + for (Map.Entry entry : retVal.getUpdateLocations().entrySet()) { + System.out.println("old location: " + entry.getKey() + " new location: " + entry.getValue()); } } - } - - private void printSerdePropURIUpdateSummary(ObjectStore.UpdateSerdeURIRetVal retVal, - String serdePropKey, boolean isDryRun) { - String tblName = new String("SERDE_PARAMS"); - - if (retVal == null) { - System.err.println("Encountered error while executing updateSerdeURI - " + - "commit of JDO transaction failed. Failed to update FSRoot locations in " + - "value field corresponding to " + serdePropKey + " in " + tblName + " table."); - } else { - Map updateLocations = retVal.getUpdateLocations(); - if (isDryRun) { - System.out.println("Dry Run of updateLocation on table " + tblName + ".."); - } else { - System.out.println("Successfully updated the following locations.."); - } - int count = printUpdateLocations(updateLocations); - if (isDryRun) { - System.out.println("Found " + count + " records in " + tblName + " table to update"); - } else { - System.out.println("Updated " + count + " records in " + tblName + " table"); - } - List badRecords = retVal.getBadRecords(); - if (badRecords.size() > 0) { - System.err.println("Warning: Found records with bad " + serdePropKey + " key in " + - tblName + " table.. "); - for (String badRecord:badRecords) { + if (retVal.getNumBadRecords() > 0) { + System.err.println("Warning: Found " + retVal.getNumBadRecords() + " records."); + if (!isSilentMode) { + for (String badRecord : retVal.getBadRecords()) { System.err.println("bad location URI: " + badRecord); } } @@ -326,46 +236,70 @@ private void printSerdePropURIUpdateSummary(ObjectStore.UpdateSerdeURIRetVal ret } public void updateFSRootLocation(URI oldURI, URI newURI, String serdePropKey, String - tablePropKey, boolean isDryRun) { + tablePropKey, boolean isDryRun, boolean isSilentMode) { HiveConf hiveConf = new HiveConf(HiveMetaTool.class); initObjectStore(hiveConf); - System.out.println("Looking for LOCATION_URI field in DBS table to update.."); - ObjectStore.UpdateMDatabaseURIRetVal updateMDBURIRetVal = objStore.updateMDatabaseURI(oldURI, - newURI, isDryRun); - printDatabaseURIUpdateSummary(updateMDBURIRetVal, isDryRun); - - System.out.println("Looking for LOCATION field in SDS table to update.."); - ObjectStore.UpdateMStorageDescriptorTblURIRetVal updateTblURIRetVal = - objStore.updateMStorageDescriptorTblURI(oldURI, newURI, isDryRun); - printTblURIUpdateSummary(updateTblURIRetVal, isDryRun); + EntityUpdater entityUpdater = new EntityUpdater(objStore, blockSize); + UriUpdateChecker updateChecker = new UriUpdateChecker(); + UpdateParams updateParams = new UpdateParams(updateChecker, isSilentMode, oldURI, newURI, isDryRun); + updateMDatabaseURI(entityUpdater, updateParams); + updateMStorageDescriptorTblURI(entityUpdater, updateParams); if (tablePropKey != null) { - System.out.println("Looking for value of " + tablePropKey + " key in TABLE_PARAMS table " + - "to update.."); - ObjectStore.UpdatePropURIRetVal updateTblPropURIRetVal = - objStore.updateTblPropURI(oldURI, newURI, - tablePropKey, isDryRun); - printPropURIUpdateSummary(updateTblPropURIRetVal, tablePropKey, isDryRun, "TABLE_PARAMS", - "updateTblPropURI"); - - System.out.println("Looking for value of " + tablePropKey + " key in SD_PARAMS table " + - "to update.."); - ObjectStore.UpdatePropURIRetVal updatePropURIRetVal = objStore - .updateMStorageDescriptorTblPropURI(oldURI, newURI, tablePropKey, isDryRun); - printPropURIUpdateSummary(updatePropURIRetVal, tablePropKey, isDryRun, "SD_PARAMS", - "updateMStorageDescriptorTblPropURI"); + updateTblPropURI(tablePropKey, entityUpdater, updateParams); + updateMStorageDescriptorTblPropURI(tablePropKey, entityUpdater, updateParams); } - if (serdePropKey != null) { - System.out.println("Looking for value of " + serdePropKey + " key in SERDE_PARAMS table " + - "to update.."); - ObjectStore.UpdateSerdeURIRetVal updateSerdeURIretVal = objStore.updateSerdeURI(oldURI, - newURI, serdePropKey, isDryRun); - printSerdePropURIUpdateSummary(updateSerdeURIretVal, serdePropKey, isDryRun); + updateSerdeURI(serdePropKey, entityUpdater, updateParams); } } + private void updateMDatabaseURI(EntityUpdater entityUpdater, UpdateParams updateParams) { + System.out.println("Looking for LOCATION_URI field in DBS table to update.."); + LocationUpdater updateMDataBase = new LocationUpdater<>(updateParams, + new LocationEntityImplementations.MDatabaseEntity()); + entityUpdater.update(updateMDataBase, MDatabase.class); + printResults(updateMDataBase, updateParams.isDryRun, updateParams.isSilentMode); + } + + private void updateMStorageDescriptorTblURI(EntityUpdater entityUpdater, UpdateParams updateParams) { + System.out.println("Looking for LOCATION field in SDS table to update.."); + LocationUpdater updateMStorageDescriptor = new LocationUpdater<>(updateParams, + new LocationEntityImplementations.MStorageDescriptorEntity()); + entityUpdater.update(updateMStorageDescriptor, MStorageDescriptor.class); + printResults(updateMStorageDescriptor, updateParams.isDryRun, updateParams.isSilentMode); + } + + private void updateTblPropURI(String tablePropKey, EntityUpdater entityUpdater, UpdateParams updateParams) { + System.out.println("Looking for value of " + tablePropKey + " key in TABLE_PARAMS table " + + "to update.."); + LocationUpdater updateMTable = new LocationUpdater<>(updateParams, + new LocationEntityImplementations.MTableEntity(tablePropKey)); + entityUpdater.update(updateMTable, MTable.class); + printResults(updateMTable, updateParams.isDryRun, updateParams.isSilentMode); + } + + @Deprecated + private void updateMStorageDescriptorTblPropURI(String tablePropKey, EntityUpdater entityUpdater, + UpdateParams updateParams) { + System.out.println("Looking for value of " + tablePropKey + " key in SD_PARAMS table " + + "to update.."); + LocationUpdater updateMStorageDescriptorTablePropKey = new LocationUpdater<>(updateParams, + new LocationEntityImplementations.MStorageDescriptorPropEntity(tablePropKey)); + entityUpdater.update(updateMStorageDescriptorTablePropKey, MStorageDescriptor.class); + printResults(updateMStorageDescriptorTablePropKey, updateParams.isDryRun, updateParams.isSilentMode); + } + + private void updateSerdeURI(String serdePropKey, EntityUpdater entityUpdater, UpdateParams updateParams) { + System.out.println("Looking for value of " + serdePropKey + " key in SERDE_PARAMS table " + + "to update.."); + LocationUpdater updateMSerDeInfo = new LocationUpdater<>(updateParams, + new LocationEntityImplementations.MSerDeInfoEntity(serdePropKey)); + entityUpdater.update(updateMSerDeInfo, MSerDeInfo.class); + printResults(updateMSerDeInfo, updateParams.isDryRun, updateParams.isSilentMode); + } + private static void printAndExit(HiveMetaTool metaTool) { HelpFormatter formatter = new HelpFormatter(); formatter.printHelp("metatool", metaTool.cmdLineOptions); @@ -393,6 +327,9 @@ public static void main(String[] args) { if (line.hasOption("dryRun")) { System.err.println("HiveMetaTool: dryRun is not valid with listFSRoot"); printAndExit(metaTool); + } else if (line.hasOption("silentMode")) { + System.err.println("HiveMetaTool: silentMode is not valid with listFSRoot"); + printAndExit(metaTool); } else if (line.hasOption("serdePropKey")) { System.err.println("HiveMetaTool: serdePropKey is not valid with listFSRoot"); printAndExit(metaTool); @@ -406,6 +343,9 @@ public static void main(String[] args) { if (line.hasOption("dryRun")) { System.err.println("HiveMetaTool: dryRun is not valid with executeJDOQL"); printAndExit(metaTool); + } else if (line.hasOption("silentMode")) { + System.err.println("HiveMetaTool: silentMode is not valid with executeJDOQL"); + printAndExit(metaTool); } else if (line.hasOption("serdePropKey")) { System.err.println("HiveMetaTool: serdePropKey is not valid with executeJDOQL"); printAndExit(metaTool); @@ -424,6 +364,7 @@ public static void main(String[] args) { } else if (line.hasOption("updateLocation")) { String[] loc = line.getOptionValues("updateLocation"); boolean isDryRun = false; + boolean isSilentMode = false; String serdepropKey = null; String tablePropKey = null; @@ -444,6 +385,10 @@ public static void main(String[] args) { isDryRun = true; } + if (line.hasOption("silentMode")) { + isSilentMode = true; + } + if (line.hasOption("serdePropKey")) { serdepropKey = line.getOptionValue("serdePropKey"); } @@ -461,7 +406,7 @@ public static void main(String[] args) { } else if (oldURI.getScheme() == null || newURI.getScheme() == null) { System.err.println("HiveMetaTool:A valid scheme is required in both old-loc and new-loc"); } else { - metaTool.updateFSRootLocation(oldURI, newURI, serdepropKey, tablePropKey, isDryRun); + metaTool.updateFSRootLocation(oldURI, newURI, serdepropKey, tablePropKey, isDryRun, isSilentMode); } } else { if (line.hasOption("dryRun")) { diff --git metastore/src/test/org/apache/hadoop/hive/metastore/metatool/BlockRetrieverIterableTest.java metastore/src/test/org/apache/hadoop/hive/metastore/metatool/BlockRetrieverIterableTest.java new file mode 100644 index 0000000..56f0a73 --- /dev/null +++ metastore/src/test/org/apache/hadoop/hive/metastore/metatool/BlockRetrieverIterableTest.java @@ -0,0 +1,136 @@ +package org.apache.hadoop.hive.metastore.metatool; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.*; + +public class BlockRetrieverIterableTest { + + void assertList(List expected, List actual) { + String[] expectedArray = expected.toArray(new String[expected.size()]); + String[] actualdArray = actual.toArray(new String[actual.size()]); + assertArrayEquals(expectedArray, actualdArray); + } + + @Test + public void testRemaingItems() { + List list = Arrays.asList("1", "2", "3", "4", + "5", "6", "7", "8", "9", "10"); + DataProviderStub dataProviderStub = new DataProviderStub(list); + BlockRetrieverIterable iterable = new BlockRetrieverIterable<>( + new BlockRetrieverIterator(dataProviderStub, 4)); + List result = new ArrayList<>(); + for (String s : iterable) { + result.add(s); + } + assertList(list, result); + List expectedEvents = Arrays.asList( + "open", + "get 0 4", + "commit", + "open", + "get 4 8", + "commit", + "open", + "get 8 12", + "commit", + "open", + "get 10 14", + "commit" + ); + assertList(expectedEvents, dataProviderStub.getEvents()); + } + + @Test + public void testExactLength() { + List list = Arrays.asList("1", "2", "3", "4", + "5", "6", "7", "8"); + DataProviderStub dataProviderStub = new DataProviderStub(list); + BlockRetrieverIterable iterable = new BlockRetrieverIterable<>( + new BlockRetrieverIterator(dataProviderStub, 4)); + List result = new ArrayList<>(); + for (String s : iterable) { + result.add(s); + } + assertList(list, result); + List expectedEvents = Arrays.asList( + "open", + "get 0 4", + "commit", + "open", + "get 4 8", + "commit", + "open", + "get 8 12", + "commit" + ); + assertList(expectedEvents, dataProviderStub.getEvents()); + } + + @Test + public void testExactOneBlockLength() { + List list = Arrays.asList("1", "2", "3", "4"); + DataProviderStub dataProviderStub = new DataProviderStub(list); + BlockRetrieverIterable iterable = new BlockRetrieverIterable<>( + new BlockRetrieverIterator(dataProviderStub, 4)); + List result = new ArrayList<>(); + for (String s : iterable) { + result.add(s); + } + assertList(list, result); + List expectedEvents = Arrays.asList( + "open", + "get 0 4", + "commit", + "open", + "get 4 8", + "commit" + ); + assertList(expectedEvents, dataProviderStub.getEvents()); + } + + @Test + public void testEmptyList() { + List list = Arrays.asList(); + DataProviderStub dataProviderStub = new DataProviderStub(list); + BlockRetrieverIterable iterable = new BlockRetrieverIterable<>( + new BlockRetrieverIterator(dataProviderStub, 4)); + List result = new ArrayList<>(); + for (String s : iterable) { + result.add(s); + } + assertList(list, result); + List expectedEvents = Arrays.asList( + "open", + "get 0 4", + "commit" + ); + assertList(expectedEvents, dataProviderStub.getEvents()); + } + + @Test + public void testExactOnePartialList() { + List list = Arrays.asList("1", "2", "3"); + DataProviderStub dataProviderStub = new DataProviderStub(list); + BlockRetrieverIterable iterable = new BlockRetrieverIterable<>( + new BlockRetrieverIterator(dataProviderStub, 4)); + List result = new ArrayList<>(); + for (String s : iterable) { + result.add(s); + } + assertList(list, result); + List expectedEvents = Arrays.asList( + "open", + "get 0 4", + "commit", + "open", + "get 3 7", + "commit" + ); + assertList(expectedEvents, dataProviderStub.getEvents()); + } +} \ No newline at end of file diff --git metastore/src/test/org/apache/hadoop/hive/metastore/metatool/DataProviderStub.java metastore/src/test/org/apache/hadoop/hive/metastore/metatool/DataProviderStub.java new file mode 100644 index 0000000..f1dae6a --- /dev/null +++ metastore/src/test/org/apache/hadoop/hive/metastore/metatool/DataProviderStub.java @@ -0,0 +1,46 @@ +package org.apache.hadoop.hive.metastore.metatool; + +import java.util.ArrayList; +import java.util.List; + +class DataProviderStub implements IDataProvider { + private final List list; + + private List events = new ArrayList<>(); + + public DataProviderStub(List list) { + this.list = list; + } + + @Override + public boolean commitTransaction() { + events.add("commit"); + return true; + } + + @Override + public void openTransaction() { + events.add("open"); + } + + @Override + public void rollbackTransaction() { + events.add("rollback"); + } + + @Override + public List getEntities(int from, int to) { + events.add("get " + from + " " + to); + return list.subList(Math.max(from, 0), Math.min(to, list.size())); + } + + @Override + public List getAllEntities() { + events.add("get all"); + return list; + } + + public List getEvents() { + return events; + } +} diff --git metastore/src/test/org/apache/hadoop/hive/metastore/metatool/EntityUpdaterTest.java metastore/src/test/org/apache/hadoop/hive/metastore/metatool/EntityUpdaterTest.java new file mode 100644 index 0000000..a962d6e --- /dev/null +++ metastore/src/test/org/apache/hadoop/hive/metastore/metatool/EntityUpdaterTest.java @@ -0,0 +1,67 @@ +package org.apache.hadoop.hive.metastore.metatool; + +import org.apache.hadoop.hive.metastore.ObjectStore; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import static org.junit.Assert.*; + +public class EntityUpdaterTest { + + private static class ListRetrieverIterable implements RetrieverIterable { + private final List list; + + public ListRetrieverIterable(List list) { + this.list = list; + } + + @Override + public Iterator iterator() { + return list.iterator(); + } + + public boolean isCommited() { + return true; + } + } + + private List getStringList(int size) { + List list = new ArrayList<>(); + for (int i = 0; i < size; ++i) { + list.add("" + i); + } + return list; + } + + private void checkList(List expexted, List actual) { + assertEquals(expexted.size(), actual.size()); + for (int i = 0; i < expexted.size(); ++i) { + assertEquals(expexted.get(i), actual.get(i)); + } + } + + @Test + public void test() { + List list = getStringList(30); + ListRetrieverIterable retrieverIterable = new ListRetrieverIterable<>(list); + + ObjectStore objectStore = Mockito.mock(ObjectStore.class); + Mockito.when(objectStore.getBlockRetriever(String.class, 20)).thenReturn(retrieverIterable); + + EntityUpdater entityUpdater = new EntityUpdater(objectStore, 20); + + LocationUpdater locationUpdater= Mockito.mock(LocationUpdater.class); + + entityUpdater.update(locationUpdater, String.class); + + ArgumentCaptor arguments = ArgumentCaptor.forClass(String.class); + Mockito.verify(locationUpdater, Mockito.times(list.size())).process(arguments.capture()); + + checkList(list, arguments.getAllValues()); + } +} \ No newline at end of file diff --git metastore/src/test/org/apache/hadoop/hive/metastore/metatool/LocationEntityImplementationsTest.java metastore/src/test/org/apache/hadoop/hive/metastore/metatool/LocationEntityImplementationsTest.java new file mode 100644 index 0000000..d11ce49 --- /dev/null +++ metastore/src/test/org/apache/hadoop/hive/metastore/metatool/LocationEntityImplementationsTest.java @@ -0,0 +1,133 @@ +package org.apache.hadoop.hive.metastore.metatool; + +import org.apache.hadoop.hive.metastore.model.MDatabase; +import org.apache.hadoop.hive.metastore.model.MSerDeInfo; +import org.apache.hadoop.hive.metastore.model.MStorageDescriptor; +import org.apache.hadoop.hive.metastore.model.MTable; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; +import static org.apache.hadoop.hive.metastore.metatool.LocationEntityImplementations.*; + +public class LocationEntityImplementationsTest { + @Test + public void testMDatabaseEntity() { + MDatabase mDataBase = mock(MDatabase.class); + LocationEntity loctionEntity = new MDatabaseEntity(); + + when(mDataBase.getLocationUri()).thenReturn("mockUri"); + loctionEntity.setEntity(mDataBase); + assertEquals("mockUri", loctionEntity.getLocation()); + + loctionEntity.setLocation("newMockUri"); + verify(mDataBase, Mockito.times(1)).setLocationUri("newMockUri"); + } + + @Test + public void testMTableEntity() { + MTable mTable = mock(MTable.class); + LocationEntity loctionEntity = new MTableEntity("foo"); + Map parameters = new HashMap<>(); + parameters.put("foo", "bar"); + + when(mTable.getParameters()).thenReturn(parameters); + loctionEntity.setEntity(mTable); + + assertEquals("bar", loctionEntity.getLocation()); + + loctionEntity.setLocation("newLocation"); + Map newParameters = new HashMap<>(); + newParameters.put("foo", "newLocation"); + verify(mTable).setParameters(eq(newParameters)); + } + + @Test + public void testMStorageDescriptorEntity() { + MStorageDescriptor mStorageDescriptor = mock(MStorageDescriptor.class); + LocationEntity loctionEntity = new MStorageDescriptorEntity(); + + when(mStorageDescriptor.getLocation()).thenReturn("mockLocation"); + loctionEntity.setEntity(mStorageDescriptor); + assertEquals("mockLocation", loctionEntity.getLocation()); + + loctionEntity.setLocation("newMockLocation"); + verify(mStorageDescriptor, Mockito.times(1)).setLocation("newMockLocation"); + } + + @Test + public void testMTableEntityKeyNotFound() { + MTable mTable = mock(MTable.class); + LocationEntity loctionEntity = new MTableEntity("foo"); + Map parameters = new HashMap<>(); + + when(mTable.getParameters()).thenReturn(parameters); + loctionEntity.setEntity(mTable); + + assertNull(loctionEntity.getLocation()); + } + + @Test + public void testMStorageDescriptorPropEntity() { + MStorageDescriptor mStorageDescriptor = mock(MStorageDescriptor.class); + LocationEntity loctionEntity = new MStorageDescriptorPropEntity("foo"); + Map parameters = new HashMap<>(); + parameters.put("foo", "bar"); + + when(mStorageDescriptor.getParameters()).thenReturn(parameters); + loctionEntity.setEntity(mStorageDescriptor); + + assertEquals("bar", loctionEntity.getLocation()); + + loctionEntity.setLocation("newLocation"); + Map newParameters = new HashMap<>(); + newParameters.put("foo", "newLocation"); + verify(mStorageDescriptor).setParameters(eq(newParameters)); + } + + @Test + public void testMStorageDescriptorEntityPropKeyNotFound() { + MStorageDescriptor mStorageDescriptor = mock(MStorageDescriptor.class); + LocationEntity loctionEntity = new MStorageDescriptorPropEntity("foo"); + Map parameters = new HashMap<>(); + + when(mStorageDescriptor.getParameters()).thenReturn(parameters); + loctionEntity.setEntity(mStorageDescriptor); + + assertNull(loctionEntity.getLocation()); + } + + @Test + public void testMSerDeInfoEntity() { + MSerDeInfo mSerDeInfo = mock(MSerDeInfo.class); + LocationEntity loctionEntity = new MSerDeInfoEntity("foo"); + Map parameters = new HashMap<>(); + parameters.put("foo", "bar"); + + when(mSerDeInfo.getParameters()).thenReturn(parameters); + loctionEntity.setEntity(mSerDeInfo); + + assertEquals("bar", loctionEntity.getLocation()); + + loctionEntity.setLocation("newLocation"); + Map newParameters = new HashMap<>(); + newParameters.put("foo", "newLocation"); + verify(mSerDeInfo).setParameters(eq(newParameters)); + } + + @Test + public void testMSerDeInfoEntityKeyNotFound() { + MSerDeInfo mSerDeInfo = mock(MSerDeInfo.class); + LocationEntity loctionEntity = new MSerDeInfoEntity("foo"); + Map parameters = new HashMap<>(); + + when(mSerDeInfo.getParameters()).thenReturn(parameters); + loctionEntity.setEntity(mSerDeInfo); + + assertNull(loctionEntity.getLocation()); + } +} diff --git metastore/src/test/org/apache/hadoop/hive/metastore/metatool/LocationUpdaterTest.java metastore/src/test/org/apache/hadoop/hive/metastore/metatool/LocationUpdaterTest.java new file mode 100644 index 0000000..05b2e4a --- /dev/null +++ metastore/src/test/org/apache/hadoop/hive/metastore/metatool/LocationUpdaterTest.java @@ -0,0 +1,187 @@ +package org.apache.hadoop.hive.metastore.metatool; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.model.MDatabase; +import org.junit.Test; +import org.mockito.Mockito; + +import java.net.URI; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.*; + +public class LocationUpdaterTest { + @Test + public void test() { + UriUpdateChecker updateChecker = Mockito.mock(UriUpdateChecker.class); + Mockito.when(updateChecker.shouldUpdateURI(Mockito.any(URI.class), Mockito.any(URI.class))).thenReturn(true, + false); + + boolean isSilentMode = false; + URI oldURI = new Path("hdfs://a.b.com:8020/a/b/").toUri(); + URI newURI = new Path("hdfs://a.b.com:8020/x/y/").toUri(); + boolean isDryRun = false; + UpdateParams updateParams = new UpdateParams(updateChecker, isSilentMode, oldURI, newURI, isDryRun); + + LocationUpdater updateMDatabaseURIRetVal = new LocationUpdater<>(updateParams, + new LocationEntityImplementations.MDatabaseEntity()); + + MDatabase mDatabase1 = Mockito.mock(MDatabase.class); + MDatabase mDatabase2 = Mockito.mock(MDatabase.class); + MDatabase mDatabase3 = Mockito.mock(MDatabase.class); + MDatabase mDatabase4 = Mockito.mock(MDatabase.class); + Mockito.when(mDatabase1.getLocationUri()).thenReturn("hdfs://a.b.com:8020/a/b/c/d"); + // this won't be updated, and it is not a bad record + Mockito.when(mDatabase2.getLocationUri()).thenReturn("hdfs://a.b.com:8021/a/b/c/d"); + Mockito.when(mDatabase3.getLocationUri()).thenReturn("a.b.com:1234/a/b/c/d"); + Mockito.when(mDatabase4.getLocationUri()).thenReturn(null); + + updateMDatabaseURIRetVal.process(mDatabase1); + updateMDatabaseURIRetVal.process(mDatabase2); + updateMDatabaseURIRetVal.process(mDatabase3); + updateMDatabaseURIRetVal.process(mDatabase4); + + Mockito.verify(mDatabase1).setLocationUri("hdfs://a.b.com:8020/x/y/c/d"); + + List badRecords = updateMDatabaseURIRetVal.getRetVal().getBadRecords(); + assertEquals(1, badRecords.size()); + assertEquals("a.b.com:1234/a/b/c/d", badRecords.get(0)); + assertEquals(1, updateMDatabaseURIRetVal.getRetVal().getNumBadRecords()); + + Map updateLocations = updateMDatabaseURIRetVal.getRetVal().getUpdateLocations(); + assertEquals(1, updateLocations.size()); + assertEquals("hdfs://a.b.com:8020/x/y/c/d", updateLocations.get("hdfs://a.b.com:8020/a/b/c/d")); + assertEquals(1, updateMDatabaseURIRetVal.getRetVal().getNumUpdatedRecords()); + + assertEquals(1, updateMDatabaseURIRetVal.getRetVal().getNumNullRecords()); + } + + @Test + public void testSilentMode() { + UriUpdateChecker updateChecker = Mockito.mock(UriUpdateChecker.class); + Mockito.when(updateChecker.shouldUpdateURI(Mockito.any(URI.class), Mockito.any(URI.class))).thenReturn(true, + false); + + boolean isSilentMode = true; + URI oldURI = new Path("hdfs://a.b.com:8020/a/b/").toUri(); + URI newURI = new Path("hdfs://a.b.com:8020/x/y/").toUri(); + boolean isDryRun = false; + UpdateParams updateParams = new UpdateParams(updateChecker, isSilentMode, oldURI, newURI, isDryRun); + + LocationUpdater updateMDatabaseURIRetVal = new LocationUpdater<>(updateParams, + new LocationEntityImplementations.MDatabaseEntity()); + + MDatabase mDatabase1 = Mockito.mock(MDatabase.class); + MDatabase mDatabase2 = Mockito.mock(MDatabase.class); + MDatabase mDatabase3 = Mockito.mock(MDatabase.class); + Mockito.when(mDatabase1.getLocationUri()).thenReturn("hdfs://a.b.com:8020/a/b/c/d"); + // this won't be updated, and it is not a bad record + Mockito.when(mDatabase2.getLocationUri()).thenReturn("hdfs://a.b.com:8021/a/b/c/d"); + Mockito.when(mDatabase3.getLocationUri()).thenReturn("a.b.com:1234/a/b/c/d"); + + updateMDatabaseURIRetVal.process(mDatabase1); + updateMDatabaseURIRetVal.process(mDatabase2); + updateMDatabaseURIRetVal.process(mDatabase3); + + Mockito.verify(mDatabase1).setLocationUri("hdfs://a.b.com:8020/x/y/c/d"); + + List badRecords = updateMDatabaseURIRetVal.getRetVal().getBadRecords(); + assertEquals(0, badRecords.size()); + assertEquals(1, updateMDatabaseURIRetVal.getRetVal().getNumBadRecords()); + + Map updateLocations = updateMDatabaseURIRetVal.getRetVal().getUpdateLocations(); + assertEquals(0, updateLocations.size()); + assertEquals(1, updateMDatabaseURIRetVal.getRetVal().getNumUpdatedRecords()); + + assertEquals(0, updateMDatabaseURIRetVal.getRetVal().getNumNullRecords()); + } + + @Test + public void testDryrunMode() { + UriUpdateChecker updateChecker = Mockito.mock(UriUpdateChecker.class); + Mockito.when(updateChecker.shouldUpdateURI(Mockito.any(URI.class), Mockito.any(URI.class))).thenReturn(true, + false); + + boolean isSilentMode = false; + URI oldURI = new Path("hdfs://a.b.com:8020/a/b/").toUri(); + URI newURI = new Path("hdfs://a.b.com:8020/x/y/").toUri(); + boolean isDryRun = true; + UpdateParams updateParams = new UpdateParams(updateChecker, isSilentMode, oldURI, newURI, isDryRun); + + LocationUpdater updateMDatabaseURIRetVal = new LocationUpdater<>(updateParams, + new LocationEntityImplementations.MDatabaseEntity()); + + MDatabase mDatabase1 = Mockito.mock(MDatabase.class); + MDatabase mDatabase2 = Mockito.mock(MDatabase.class); + MDatabase mDatabase3 = Mockito.mock(MDatabase.class); + MDatabase mDatabase4 = Mockito.mock(MDatabase.class); + MDatabase mDatabase5 = Mockito.mock(MDatabase.class); + Mockito.when(mDatabase1.getLocationUri()).thenReturn("hdfs://a.b.com:8020/a/b/c/d"); + // this won't be updated, and it is not a bad record + Mockito.when(mDatabase2.getLocationUri()).thenReturn("hdfs://a.b.com:8021/a/b/c/d"); + Mockito.when(mDatabase3.getLocationUri()).thenReturn("a.b.com:1234/a/b/c/d"); + Mockito.when(mDatabase4.getLocationUri()).thenReturn(null); + Mockito.when(mDatabase5.getLocationUri()).thenReturn(null); + + updateMDatabaseURIRetVal.process(mDatabase1); + updateMDatabaseURIRetVal.process(mDatabase2); + updateMDatabaseURIRetVal.process(mDatabase3); + updateMDatabaseURIRetVal.process(mDatabase4); + updateMDatabaseURIRetVal.process(mDatabase5); + + Mockito.verify(mDatabase1, Mockito.never()).setLocationUri("hdfs://a.b.com:8020/x/y/c/d"); + + List badRecords = updateMDatabaseURIRetVal.getRetVal().getBadRecords(); + assertEquals(1, badRecords.size()); + assertEquals("a.b.com:1234/a/b/c/d", badRecords.get(0)); + assertEquals(1, updateMDatabaseURIRetVal.getRetVal().getNumBadRecords()); + + Map updateLocations = updateMDatabaseURIRetVal.getRetVal().getUpdateLocations(); + assertEquals(1, updateLocations.size()); + assertEquals("hdfs://a.b.com:8020/x/y/c/d", updateLocations.get("hdfs://a.b.com:8020/a/b/c/d")); + assertEquals(1, updateMDatabaseURIRetVal.getRetVal().getNumUpdatedRecords()); + + assertEquals(2, updateMDatabaseURIRetVal.getRetVal().getNumNullRecords()); + } + + @Test + public void testDryrunAndSilentMode() { + UriUpdateChecker updateChecker = Mockito.mock(UriUpdateChecker.class); + Mockito.when(updateChecker.shouldUpdateURI(Mockito.any(URI.class), Mockito.any(URI.class))).thenReturn(true, + false); + + boolean isSilentMode = true; + URI oldURI = new Path("hdfs://a.b.com:8020/a/b/").toUri(); + URI newURI = new Path("hdfs://a.b.com:8020/x/y/").toUri(); + boolean isDryRun = true; + UpdateParams updateParams = new UpdateParams(updateChecker, isSilentMode, oldURI, newURI, isDryRun); + + LocationUpdater updateMDatabaseURIRetVal = new LocationUpdater<>(updateParams, + new LocationEntityImplementations.MDatabaseEntity()); + + MDatabase mDatabase1 = Mockito.mock(MDatabase.class); + MDatabase mDatabase2 = Mockito.mock(MDatabase.class); + MDatabase mDatabase3 = Mockito.mock(MDatabase.class); + Mockito.when(mDatabase1.getLocationUri()).thenReturn("hdfs://a.b.com:8020/a/b/c/d"); + // this won't be updated, and it is not a bad record + Mockito.when(mDatabase2.getLocationUri()).thenReturn("hdfs://a.b.com:8021/a/b/c/d"); + Mockito.when(mDatabase3.getLocationUri()).thenReturn("a.b.com:1234/a/b/c/d"); + + updateMDatabaseURIRetVal.process(mDatabase1); + updateMDatabaseURIRetVal.process(mDatabase2); + updateMDatabaseURIRetVal.process(mDatabase3); + + Mockito.verify(mDatabase1, Mockito.never()).setLocationUri("hdfs://a.b.com:8020/x/y/c/d"); + + List badRecords = updateMDatabaseURIRetVal.getRetVal().getBadRecords(); + assertEquals(0, badRecords.size()); + assertEquals(1, updateMDatabaseURIRetVal.getRetVal().getNumBadRecords()); + + Map updateLocations = updateMDatabaseURIRetVal.getRetVal().getUpdateLocations(); + assertEquals(0, updateLocations.size()); + assertEquals(1, updateMDatabaseURIRetVal.getRetVal().getNumUpdatedRecords()); + + assertEquals(0, updateMDatabaseURIRetVal.getRetVal().getNumNullRecords()); + } +} \ No newline at end of file diff --git metastore/src/test/org/apache/hadoop/hive/metastore/metatool/ReturnValueTest.java metastore/src/test/org/apache/hadoop/hive/metastore/metatool/ReturnValueTest.java new file mode 100644 index 0000000..78002c1 --- /dev/null +++ metastore/src/test/org/apache/hadoop/hive/metastore/metatool/ReturnValueTest.java @@ -0,0 +1,26 @@ +package org.apache.hadoop.hive.metastore.metatool; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.*; + +public class ReturnValueTest { + @Test + public void test() { + List badREcords = new ArrayList<>(); + badREcords.add("qwe"); + Map updateRecords = new HashMap<>(); + updateRecords.put("aaa", "bbb"); + ReturnValue returnValue = new ReturnValue(badREcords, updateRecords, 42, 100, 1234); + assertSame(badREcords, returnValue.getBadRecords()); + assertSame(updateRecords, returnValue.getUpdateLocations()); + assertEquals(42, returnValue.getNumNullRecords()); + assertEquals(100, returnValue.getNumUpdatedRecords()); + assertEquals(1234, returnValue.getNumBadRecords()); + } +} diff --git metastore/src/test/org/apache/hadoop/hive/metastore/metatool/UriUpdateCheckerTest.java metastore/src/test/org/apache/hadoop/hive/metastore/metatool/UriUpdateCheckerTest.java new file mode 100644 index 0000000..06952b9 --- /dev/null +++ metastore/src/test/org/apache/hadoop/hive/metastore/metatool/UriUpdateCheckerTest.java @@ -0,0 +1,65 @@ +package org.apache.hadoop.hive.metastore.metatool; + +import org.apache.hadoop.fs.Path; +import org.junit.Test; + +import java.net.URI; +import java.net.URISyntaxException; + +import static org.junit.Assert.*; + +public class UriUpdateCheckerTest { + + boolean checkLocations(UriUpdateChecker updateChecker, String oldLoc, String newLoc) { + URI oldUri = new Path(oldLoc).toUri(); + URI newUri = new Path(newLoc).toUri(); + return updateChecker.shouldUpdateURI(oldUri, newUri); + } + + @Test + public void testValidURI() { + URI uri = new Path("asd://abc.qwe.com:1234/q/w/e").toUri(); + assertEquals("asd", uri.getScheme()); + assertEquals("abc.qwe.com", uri.getHost()); + assertEquals(1234, uri.getPort()); + assertEquals("/q/w/e", uri.getPath()); + } + + @Test + public void testURINoPort() { + URI uri = new Path("asd://abc.qwe.com/q/w/e").toUri(); + assertEquals("asd", uri.getScheme()); + assertEquals("abc.qwe.com", uri.getHost()); + assertEquals(-1, uri.getPort()); + assertEquals("/q/w/e", uri.getPath()); + } + + @Test + public void testURIBadHost() { + URI uri = new Path("asd://bahostformat....com:1234/q/w/e").toUri(); + assertEquals("asd", uri.getScheme()); + assertEquals(null, uri.getHost()); + assertEquals(-1, uri.getPort()); + assertEquals("/q/w/e", uri.getPath()); + } + + @Test(expected = java.lang.IllegalArgumentException.class) + public void testURINoScheme() { + URI uri = new Path("a.b.com:1234").toUri(); + } + + @Test(expected = java.lang.IllegalArgumentException.class) + public void testURINoSchemew() { + URI uri = new Path("a.b.com:1234/a/b/c/d").toUri(); + } + + @Test + public void test() throws URISyntaxException { + UriUpdateChecker updateChecker = new UriUpdateChecker(); + assertFalse(checkLocations(updateChecker, "hdfs://abc.def.com:1234/a/b/c", "hdfs://xyz.wzy.com:1234/a/b/c")); + assertFalse(checkLocations(updateChecker, "hdfs://abc.def.com:1234/a/b/c", "hdfs://abc.def.com:9876/a/b/c")); + assertFalse(checkLocations(updateChecker, "hdfs://abc.def.com:1234/a/b/c", "hdfs2://abc.def.com:1234/a/b/c")); + assertTrue(checkLocations(updateChecker, "hdfs://abc.def.com:1234/q/w/e/r/t", "hdfs://abc.def.com:1234/a/b/c")); + assertTrue(checkLocations(updateChecker, "hdfs://abc.def.com:1234/q/w/e", "hdfs://abc.def.com:1234/q/w/e")); + } +} diff --git metastore/src/test/org/apache/hadoop/hive/metastore/tools/HiveMetaToolTest.java metastore/src/test/org/apache/hadoop/hive/metastore/tools/HiveMetaToolTest.java new file mode 100644 index 0000000..4125b56 --- /dev/null +++ metastore/src/test/org/apache/hadoop/hive/metastore/tools/HiveMetaToolTest.java @@ -0,0 +1,282 @@ +package org.apache.hadoop.hive.metastore.tools; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.ObjectStore; +import org.apache.hadoop.hive.metastore.metatool.BlockRetrieverIterable; +import org.apache.hadoop.hive.metastore.metatool.BlockRetrieverIterator; +import org.apache.hadoop.hive.metastore.metatool.IDataProvider; +import org.apache.hadoop.hive.metastore.metatool.RetrieverIterable; +import org.apache.hadoop.hive.metastore.model.*; +import org.junit.Test; + +import javax.jdo.*; +import java.io.File; +import java.net.URI; +import java.sql.*; +import java.sql.Connection; +import java.util.*; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +public class HiveMetaToolTest { + static String dbName = System.getProperty("java.io.tmpdir") + File.separator + "test_metatool-" + + System.currentTimeMillis(); + + private void printRS(ResultSet rs) throws SQLException { + int num = 0; + int colNum = rs.getMetaData().getColumnCount(); + while (rs.next()) { + StringBuilder sb = new StringBuilder(); + sb.append(++num); + for (int i = 1; i <= colNum; ++i) { + sb.append(", "); + sb.append(rs.getString(i)); + } + System.out.println(sb.toString()); + } + } + + private static RetrieverIterable getBlockRetriever(PersistenceManager pm, Class clazz, int blockSize) { + return new BlockRetrieverIterable<>(new BlockRetrieverIterator(new TestDataProvider<>(pm, clazz), + blockSize)); + } + + @Test + public void test3() throws ClassNotFoundException, SQLException { + String driver = "org.apache.derby.jdbc.EmbeddedDriver"; + Class.forName(driver); + + String connectionURL = "jdbc:derby:" + dbName + ";create=true"; + Connection conn = DriverManager.getConnection(connectionURL); + + // drop("DBS", conn); + // drop("DATABASE_PARAMS", conn); + + Map properties = new HashMap<>(); + properties.put("javax.jdo.PersistenceManagerFactoryClass", "org.datanucleus.api.jdo.JDOPersistenceManagerFactory"); + properties.put("javax.jdo.option.ConnectionDriverName", driver); + properties.put("javax.jdo.option.ConnectionURL", connectionURL); + properties.put("datanucleus.autoCreateSchema", "true"); + + PersistenceManagerFactory pmf = JDOHelper.getPersistenceManagerFactory(properties); + PersistenceManager pm = pmf.getPersistenceManager(); + + Transaction tx = pm.currentTransaction(); + tx.begin(); + + createMDatabase(pm, "db1", "abc.com:8020/a/b/c", "tkey http://abc.com:8020/aa/b/c1|k1 v1"); + createMDatabase(pm, "db2", "http://abc.com:8020/aa/b/c1", "k1 v1|k2 v2"); + createMDatabase(pm, "db3", "http://abc.com:8020/aa/b/c2", "tkey http://abc.com:8020/aa/b/c2|k1 v1|k2 v2"); + createMDatabase(pm, "db4", "abc.com:8020/a/b/c3", "k1 v1|k2 v2|k3 v3"); + createMDatabase(pm, "db5", "http://abc.com:8020/a/b/c4", "k1 v1|tkey http://abc.com:8020/aa/b/c3|k2 v2"); + createMDatabase(pm, "db6", "http://abc.com:8020/a/b/c4", "k1 v1|k2 v2|k3 v3"); + + createMTable(pm, "t1", "tkey http://abc.com:8020/a/b/c1|k1 v1|k2 v2"); + createMTable(pm, "t2", "k1 v1|k2 v2|k3 v3"); + createMTable(pm, "t3", "tkey http://abc.com:8020/aa/b/c2|k1 v1|k2 v2"); + + createMStorageDescriptor(pm, "http://abc.com:8020/aa/b/c1", "k1 v1|k2 v2|k3 v3"); + createMStorageDescriptor(pm, "http://abc.com:8020/aa/b/c2", "tkey http://abc.com:8020/aa/b/c4|k1 v1|k2 v2"); + createMStorageDescriptor(pm, "http://abc.com:8020/a/b/c3", "tkey http://abc.com:8020/a/b/c5|k1 v1"); + + createMSerDeInfo(pm, "mdsi1", "k1 v1|skey http://abc.com:8020/a/b/c1|k2 v2"); + createMSerDeInfo(pm, "mdsi2", "k1 v1|k2 v2|k3 v3"); + createMSerDeInfo(pm, "mdsi3", "k1 v1|k2 v2|skey http://abc.com:8020/aa/b/c2"); + + tx.commit(); + + HiveMetaTool hiveMetaTool = new HiveMetaTool(); + ObjectStore objectStore = mock(ObjectStore.class); + hiveMetaTool.setObjectStore(objectStore, 5); + + when(objectStore.getBlockRetriever(MDatabase.class, 5)) + .thenReturn(getBlockRetriever(MDatabase.class, 5, pm)); + when(objectStore.getBlockRetriever(MStorageDescriptor.class, 5)) + .thenReturn(getBlockRetriever(MStorageDescriptor.class, 5, pm)) + .thenReturn(getBlockRetriever(MStorageDescriptor.class, 5, pm)); + when(objectStore.getBlockRetriever(MTable.class, 5)) + .thenReturn(getBlockRetriever(MTable.class, 5, pm)); + when(objectStore.getBlockRetriever(MSerDeInfo.class, 5)) + .thenReturn(getBlockRetriever(MSerDeInfo.class, 5, pm)); + + URI oldURI = new Path("http://abc.com:8020/aa").toUri(); + URI newURI = new Path("http://zxy.org:9999/xx").toUri(); + String serdepropKey = "skey"; + String tablePropKey = "tkey"; + boolean isDryRun = false; + boolean isSilentMode = false; + hiveMetaTool.updateFSRootLocation(oldURI, newURI, serdepropKey, tablePropKey, isDryRun, isSilentMode); + + List dbs = getAll(pm, MDatabase.class); + List dbsOutput = new ArrayList<>(); + for (MDatabase db : dbs) { + dbsOutput.add(db.getName() + " " + db.getLocationUri() + " " + mapToString(db.getParameters())); + } + String[] dbsExpected = new String[]{ + "db1 abc.com:8020/a/b/c k1 v1|tkey http://abc.com:8020/aa/b/c1", + "db2 http://zxy.org:9999/xx/b/c1 k1 v1|k2 v2", + "db3 http://zxy.org:9999/xx/b/c2 k1 v1|k2 v2|tkey http://abc.com:8020/aa/b/c2", + "db4 abc.com:8020/a/b/c3 k1 v1|k2 v2|k3 v3", + "db5 http://abc.com:8020/a/b/c4 k1 v1|k2 v2|tkey http://abc.com:8020/aa/b/c3", + "db6 http://abc.com:8020/a/b/c4 k1 v1|k2 v2|k3 v3" + }; + assertEqualsAparFromOrder(dbsExpected, dbsOutput); + + List tables = getAll(pm, MTable.class); + List tablesOutput = new ArrayList<>(); + for (MTable t : tables) { + tablesOutput.add(t.getTableName() + " " + mapToString(t.getParameters())); + } + String[] tablesExpected = new String[]{ + "t1 k1 v1|k2 v2|tkey http://abc.com:8020/a/b/c1", + "t2 k1 v1|k2 v2|k3 v3", + "t3 k1 v1|k2 v2|tkey http://zxy.org:9999/xx/b/c2" + }; + assertEqualsAparFromOrder(tablesExpected, tablesOutput); + + + List sds = getAll(pm, MStorageDescriptor.class); + List sdsOutput = new ArrayList<>(); + for (MStorageDescriptor sd : sds) { + sdsOutput.add(sd.getLocation() + " " + mapToString(sd.getParameters())); + } + String[] sdsExpected = new String[]{ + "http://zxy.org:9999/xx/b/c1 k1 v1|k2 v2|k3 v3", + "http://zxy.org:9999/xx/b/c2 k1 v1|k2 v2|tkey http://zxy.org:9999/xx/b/c4", + "http://abc.com:8020/a/b/c3 k1 v1|tkey http://abc.com:8020/a/b/c5" + }; + assertEqualsAparFromOrder(sdsExpected, sdsOutput); + + List serdes = getAll(pm, MSerDeInfo.class); + List serdesOutput = new ArrayList<>(); + for (MSerDeInfo serde : serdes) { + serdesOutput.add(serde.getName() + " " + mapToString(serde.getParameters())); + } + String[] serdesExpected = new String[]{ + "mdsi1 k1 v1|k2 v2|skey http://abc.com:8020/a/b/c1", + "mdsi2 k1 v1|k2 v2|k3 v3", + "mdsi3 k1 v1|k2 v2|skey http://zxy.org:9999/xx/b/c2" + }; + assertEqualsAparFromOrder(serdesExpected, serdesOutput); + } + + private void assertEqualsAparFromOrder(String[] dbExpected, List dbOutput) { + Arrays.sort(dbExpected); + Collections.sort(dbOutput); + assertEquals(dbExpected.length, dbOutput.size()); + for (int i = 0; i < dbExpected.length; ++i) { + assertEquals(dbExpected[i], dbOutput.get(i)); + } + } + + private List getAll(PersistenceManager pm, Class clazz) { + Transaction tx = pm.currentTransaction(); + tx.begin(); + Query query = pm.newQuery(clazz); + List entities = (List) query.execute(); + pm.retrieveAll(entities); + tx.commit(); + return entities; + } + + private MDatabase createMDatabase(PersistenceManager pm, String name, String location, String params) { + MDatabase md = new MDatabase(name, location, "dummy description", stringToMap(params)); + pm.makePersistent(md); + return md; + } + + private MTable createMTable(PersistenceManager pm, String name, String params) { + MTable table = new MTable(); //(name, location, "dummy description", stringToMap(params)); + table.setTableName(name); + table.setParameters(stringToMap(params)); + pm.makePersistent(table); + return table; + } + + private MStorageDescriptor createMStorageDescriptor(PersistenceManager pm, String location, + String params) { + MStorageDescriptor msd = new MStorageDescriptor(); + msd.setLocation(location); + msd.setParameters(stringToMap(params)); + pm.makePersistent(msd); + return msd; + } + + private MSerDeInfo createMSerDeInfo(PersistenceManager pm, String name, String params) { + MSerDeInfo msdi = new MSerDeInfo(name, "serializationlib", stringToMap(params)); + pm.makePersistent(msdi); + return msdi; + } + + private RetrieverIterable getBlockRetriever(Class clazz, int blockSize, PersistenceManager pm) { + return new BlockRetrieverIterable<>(new BlockRetrieverIterator(new TestDataProvider<>(pm, clazz), + blockSize)); + } + + private String mapToString(Map m) { + Map sorted = new TreeMap<>(); + sorted.putAll(m); + StringBuilder sb = new StringBuilder(); + boolean isFirst = true; + for (Map.Entry it : sorted.entrySet()) { + if (isFirst) { + isFirst = false; + } else { + sb.append("|"); + } + sb.append(it.getKey()); + sb.append(" "); + sb.append(it.getValue()); + } + return sb.toString(); + } + + private Map stringToMap(String s) { + Map m = new HashMap<>(); + String[] splitted = s.split("\\|"); + for (int i = 0; i < splitted.length; ++i) { + String[] keyValue = splitted[i].split(" "); + m.put(keyValue[0], keyValue[1]); + } + return m; + } + + private static class TestDataProvider implements IDataProvider { + private final PersistenceManager pm; + private final Class clazz; + private Transaction currentTransaction; + + public TestDataProvider(PersistenceManager pm, Class clazz) { + this.pm = pm; + this.clazz = clazz; + } + + @Override + public boolean commitTransaction() { + currentTransaction.commit(); + return true; + } + + @Override + public void openTransaction() { + currentTransaction = pm.currentTransaction(); + currentTransaction.begin(); + } + + @Override + public void rollbackTransaction() { + currentTransaction.rollback(); + } + + @Override + public List getEntities(int from, int to) { + Query query = pm.newQuery(clazz); + System.out.println("DEBUG range: from=" + from + " to=" + to + "-1"); + query.setRange(from, to); + List entities = (List) query.execute(); + pm.retrieveAll(entities); + return entities; + } + } +}