Index: coding_style.xml
===================================================================
--- coding_style.xml (revision 1381792)
+++ coding_style.xml (working copy)
@@ -1,7 +1,7 @@
+ "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
+ "http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
-
-
+
+
+
-
-
-
+
+
+
-
-
-
-
+
+
+
+
-
+
-
-
-
-
-
-
+
+
+
+
+
+
-
+
+
+
+
+
Index: shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java
===================================================================
--- shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java (revision 1381792)
+++ shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java (working copy)
@@ -49,20 +49,20 @@
@Override
public org.apache.hadoop.mapreduce.TaskAttemptContext createTaskAttemptContext(Configuration conf,
- org.apache.hadoop.mapreduce.TaskAttemptID taskId) {
+ org.apache.hadoop.mapreduce.TaskAttemptID taskId) {
return new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(conf, taskId);
}
@Override
public org.apache.hadoop.mapred.TaskAttemptContext createTaskAttemptContext(org.apache.hadoop.mapred.JobConf conf,
- org.apache.hadoop.mapred.TaskAttemptID taskId, Progressable progressable) {
+ org.apache.hadoop.mapred.TaskAttemptID taskId, Progressable progressable) {
org.apache.hadoop.mapred.TaskAttemptContext newContext = null;
try {
java.lang.reflect.Constructor construct = org.apache.hadoop.mapred.TaskAttemptContextImpl.class.getDeclaredConstructor(
- org.apache.hadoop.mapred.JobConf.class, org.apache.hadoop.mapred.TaskAttemptID.class,
- Reporter.class);
+ org.apache.hadoop.mapred.JobConf.class, org.apache.hadoop.mapred.TaskAttemptID.class,
+ Reporter.class);
construct.setAccessible(true);
- newContext = (org.apache.hadoop.mapred.TaskAttemptContext)construct.newInstance(conf, taskId, (Reporter)progressable);
+ newContext = (org.apache.hadoop.mapred.TaskAttemptContext) construct.newInstance(conf, taskId, (Reporter) progressable);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -71,7 +71,7 @@
@Override
public JobContext createJobContext(Configuration conf,
- JobID jobId) {
+ JobID jobId) {
JobContext ctxt = new JobContextImpl(conf, jobId);
return ctxt;
@@ -79,15 +79,15 @@
@Override
public org.apache.hadoop.mapred.JobContext createJobContext(org.apache.hadoop.mapred.JobConf conf,
- org.apache.hadoop.mapreduce.JobID jobId, Progressable progressable) {
- org.apache.hadoop.mapred.JobContext newContext =
- new org.apache.hadoop.mapred.JobContextImpl(conf, jobId, (org.apache.hadoop.mapred.Reporter)progressable);
+ org.apache.hadoop.mapreduce.JobID jobId, Progressable progressable) {
+ org.apache.hadoop.mapred.JobContext newContext =
+ new org.apache.hadoop.mapred.JobContextImpl(conf, jobId, (org.apache.hadoop.mapred.Reporter) progressable);
return newContext;
}
@Override
public void commitJob(OutputFormat outputFormat, ResourceSchema schema,
- String arg1, Job job) throws IOException {
+ String arg1, Job job) throws IOException {
// Do nothing as this was fixed by MAPREDUCE-1447.
}
@@ -106,12 +106,12 @@
@Override
public String getPropertyName(PropertyName name) {
switch (name) {
- case CACHE_ARCHIVES:
- return MRJobConfig.CACHE_ARCHIVES;
- case CACHE_FILES:
- return MRJobConfig.CACHE_FILES;
- case CACHE_SYMLINK:
- return MRJobConfig.CACHE_SYMLINK;
+ case CACHE_ARCHIVES:
+ return MRJobConfig.CACHE_ARCHIVES;
+ case CACHE_FILES:
+ return MRJobConfig.CACHE_FILES;
+ case CACHE_SYMLINK:
+ return MRJobConfig.CACHE_SYMLINK;
}
return "";
Index: src/test/org/apache/hcatalog/HcatTestUtils.java
===================================================================
--- src/test/org/apache/hcatalog/HcatTestUtils.java (revision 1381792)
+++ src/test/org/apache/hcatalog/HcatTestUtils.java (working copy)
@@ -33,74 +33,74 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
+/**
* Utility methods for tests
*/
public class HcatTestUtils {
- private static final Logger LOG = LoggerFactory.getLogger(HcatTestUtils.class);
+ private static final Logger LOG = LoggerFactory.getLogger(HcatTestUtils.class);
- public static FsPermission perm007 = FsPermission.createImmutable((short) 0007); // -------rwx
- public static FsPermission perm070 = FsPermission.createImmutable((short) 0070); // ----rwx---
- public static FsPermission perm700 = FsPermission.createImmutable((short) 0700); // -rwx------
- public static FsPermission perm755 = FsPermission.createImmutable((short) 0755); // -rwxr-xr-x
- public static FsPermission perm777 = FsPermission.createImmutable((short) 0777); // -rwxrwxrwx
- public static FsPermission perm300 = FsPermission.createImmutable((short) 0300); // --wx------
- public static FsPermission perm500 = FsPermission.createImmutable((short) 0500); // -r-x------
- public static FsPermission perm555 = FsPermission.createImmutable((short) 0555); // -r-xr-xr-x
-
- /**
- * Returns the database path.
- */
- public static Path getDbPath(Hive hive, Warehouse wh, String dbName) throws MetaException, HiveException {
- return wh.getDatabasePath(hive.getDatabase(dbName));
- }
-
- /**
- * Removes all databases and tables from the metastore
- */
- public static void cleanupHMS(Hive hive, Warehouse wh, FsPermission defaultPerm)
- throws HiveException, MetaException, NoSuchObjectException {
- for (String dbName : hive.getAllDatabases()) {
- if (dbName.equals("default")) {
- continue;
- }
- try {
- Path path = getDbPath(hive, wh, dbName);
- FileSystem whFs = path.getFileSystem(hive.getConf());
- whFs.setPermission(path, defaultPerm);
- } catch(IOException ex) {
- //ignore
- }
- hive.dropDatabase(dbName, true, true, true);
+ public static FsPermission perm007 = FsPermission.createImmutable((short) 0007); // -------rwx
+ public static FsPermission perm070 = FsPermission.createImmutable((short) 0070); // ----rwx---
+ public static FsPermission perm700 = FsPermission.createImmutable((short) 0700); // -rwx------
+ public static FsPermission perm755 = FsPermission.createImmutable((short) 0755); // -rwxr-xr-x
+ public static FsPermission perm777 = FsPermission.createImmutable((short) 0777); // -rwxrwxrwx
+ public static FsPermission perm300 = FsPermission.createImmutable((short) 0300); // --wx------
+ public static FsPermission perm500 = FsPermission.createImmutable((short) 0500); // -r-x------
+ public static FsPermission perm555 = FsPermission.createImmutable((short) 0555); // -r-xr-xr-x
+
+ /**
+ * Returns the database path.
+ */
+ public static Path getDbPath(Hive hive, Warehouse wh, String dbName) throws MetaException, HiveException {
+ return wh.getDatabasePath(hive.getDatabase(dbName));
}
-
- //clean tables in default db
- for (String tablename : hive.getAllTables("default")) {
- hive.dropTable("default", tablename, true, true);
+
+ /**
+ * Removes all databases and tables from the metastore
+ */
+ public static void cleanupHMS(Hive hive, Warehouse wh, FsPermission defaultPerm)
+ throws HiveException, MetaException, NoSuchObjectException {
+ for (String dbName : hive.getAllDatabases()) {
+ if (dbName.equals("default")) {
+ continue;
+ }
+ try {
+ Path path = getDbPath(hive, wh, dbName);
+ FileSystem whFs = path.getFileSystem(hive.getConf());
+ whFs.setPermission(path, defaultPerm);
+ } catch (IOException ex) {
+ //ignore
+ }
+ hive.dropDatabase(dbName, true, true, true);
+ }
+
+ //clean tables in default db
+ for (String tablename : hive.getAllTables("default")) {
+ hive.dropTable("default", tablename, true, true);
+ }
}
- }
- public static void createTestDataFile(String filename, String[] lines) throws IOException {
- FileWriter writer = null;
- try {
- File file = new File(filename);
- file.deleteOnExit();
- writer = new FileWriter(file);
- for (String line : lines) {
- writer.write(line + "\n");
- }
- } finally {
- if (writer != null) {
- writer.close();
- }
+ public static void createTestDataFile(String filename, String[] lines) throws IOException {
+ FileWriter writer = null;
+ try {
+ File file = new File(filename);
+ file.deleteOnExit();
+ writer = new FileWriter(file);
+ for (String line : lines) {
+ writer.write(line + "\n");
+ }
+ } finally {
+ if (writer != null) {
+ writer.close();
+ }
+ }
+
}
- }
-
- public static boolean isHadoop23() {
- String version = org.apache.hadoop.util.VersionInfo.getVersion();
- if (version.matches("\\b0\\.23\\..+\\b"))
- return true;
- return false;
- }
+ public static boolean isHadoop23() {
+ String version = org.apache.hadoop.util.VersionInfo.getVersion();
+ if (version.matches("\\b0\\.23\\..+\\b"))
+ return true;
+ return false;
+ }
}
Index: src/test/org/apache/hcatalog/rcfile/TestRCFileMapReduceInputFormat.java
===================================================================
--- src/test/org/apache/hcatalog/rcfile/TestRCFileMapReduceInputFormat.java (revision 1381792)
+++ src/test/org/apache/hcatalog/rcfile/TestRCFileMapReduceInputFormat.java (working copy)
@@ -46,11 +46,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
- /**
- * TestRCFile.
- *
- */
- public class TestRCFileMapReduceInputFormat extends TestCase {
+/**
+ * TestRCFile.
+ *
+ */
+public class TestRCFileMapReduceInputFormat extends TestCase {
private static final Logger LOG = LoggerFactory.getLogger(TestRCFileMapReduceInputFormat.class);
@@ -65,18 +65,18 @@
private static Properties tbl;
static {
- try {
- fs = FileSystem.getLocal(conf);
- Path dir = new Path(System.getProperty("test.data.dir", ".") + "/mapred");
- file = new Path(dir, "test_rcfile");
- fs.delete(dir, true);
- // the SerDe part is from TestLazySimpleSerDe
- serDe = new ColumnarSerDe();
- // Create the SerDe
- tbl = createProperties();
- serDe.initialize(conf, tbl);
- } catch (Exception e) {
- }
+ try {
+ fs = FileSystem.getLocal(conf);
+ Path dir = new Path(System.getProperty("test.data.dir", ".") + "/mapred");
+ file = new Path(dir, "test_rcfile");
+ fs.delete(dir, true);
+ // the SerDe part is from TestLazySimpleSerDe
+ serDe = new ColumnarSerDe();
+ // Create the SerDe
+ tbl = createProperties();
+ serDe.initialize(conf, tbl);
+ } catch (Exception e) {
+ }
}
private static BytesRefArrayWritable patialS = new BytesRefArrayWritable();
@@ -84,164 +84,164 @@
private static byte[][] bytesArray = null;
private static BytesRefArrayWritable s = null;
+
static {
- try {
- bytesArray = new byte[][] {"123".getBytes("UTF-8"),
- "456".getBytes("UTF-8"), "789".getBytes("UTF-8"),
- "1000".getBytes("UTF-8"), "5.3".getBytes("UTF-8"),
- "hive and hadoop".getBytes("UTF-8"), new byte[0],
- "NULL".getBytes("UTF-8")};
- s = new BytesRefArrayWritable(bytesArray.length);
- s.set(0, new BytesRefWritable("123".getBytes("UTF-8")));
- s.set(1, new BytesRefWritable("456".getBytes("UTF-8")));
- s.set(2, new BytesRefWritable("789".getBytes("UTF-8")));
- s.set(3, new BytesRefWritable("1000".getBytes("UTF-8")));
- s.set(4, new BytesRefWritable("5.3".getBytes("UTF-8")));
- s.set(5, new BytesRefWritable("hive and hadoop".getBytes("UTF-8")));
- s.set(6, new BytesRefWritable("NULL".getBytes("UTF-8")));
- s.set(7, new BytesRefWritable("NULL".getBytes("UTF-8")));
+ try {
+ bytesArray = new byte[][]{"123".getBytes("UTF-8"),
+ "456".getBytes("UTF-8"), "789".getBytes("UTF-8"),
+ "1000".getBytes("UTF-8"), "5.3".getBytes("UTF-8"),
+ "hive and hadoop".getBytes("UTF-8"), new byte[0],
+ "NULL".getBytes("UTF-8")};
+ s = new BytesRefArrayWritable(bytesArray.length);
+ s.set(0, new BytesRefWritable("123".getBytes("UTF-8")));
+ s.set(1, new BytesRefWritable("456".getBytes("UTF-8")));
+ s.set(2, new BytesRefWritable("789".getBytes("UTF-8")));
+ s.set(3, new BytesRefWritable("1000".getBytes("UTF-8")));
+ s.set(4, new BytesRefWritable("5.3".getBytes("UTF-8")));
+ s.set(5, new BytesRefWritable("hive and hadoop".getBytes("UTF-8")));
+ s.set(6, new BytesRefWritable("NULL".getBytes("UTF-8")));
+ s.set(7, new BytesRefWritable("NULL".getBytes("UTF-8")));
- // partial test init
- patialS.set(0, new BytesRefWritable("NULL".getBytes("UTF-8")));
- patialS.set(1, new BytesRefWritable("NULL".getBytes("UTF-8")));
- patialS.set(2, new BytesRefWritable("789".getBytes("UTF-8")));
- patialS.set(3, new BytesRefWritable("1000".getBytes("UTF-8")));
- patialS.set(4, new BytesRefWritable("NULL".getBytes("UTF-8")));
- patialS.set(5, new BytesRefWritable("NULL".getBytes("UTF-8")));
- patialS.set(6, new BytesRefWritable("NULL".getBytes("UTF-8")));
- patialS.set(7, new BytesRefWritable("NULL".getBytes("UTF-8")));
+ // partial test init
+ patialS.set(0, new BytesRefWritable("NULL".getBytes("UTF-8")));
+ patialS.set(1, new BytesRefWritable("NULL".getBytes("UTF-8")));
+ patialS.set(2, new BytesRefWritable("789".getBytes("UTF-8")));
+ patialS.set(3, new BytesRefWritable("1000".getBytes("UTF-8")));
+ patialS.set(4, new BytesRefWritable("NULL".getBytes("UTF-8")));
+ patialS.set(5, new BytesRefWritable("NULL".getBytes("UTF-8")));
+ patialS.set(6, new BytesRefWritable("NULL".getBytes("UTF-8")));
+ patialS.set(7, new BytesRefWritable("NULL".getBytes("UTF-8")));
- } catch (UnsupportedEncodingException e) {
- }
+ } catch (UnsupportedEncodingException e) {
+ }
}
/** For debugging and testing. */
public static void main(String[] args) throws Exception {
- int count = 10000;
- boolean create = true;
+ int count = 10000;
+ boolean create = true;
- String usage = "Usage: RCFile " + "[-count N]" + " file";
- if (args.length == 0) {
- LOG.error(usage);
- System.exit(-1);
- }
-
- try {
- for (int i = 0; i < args.length; ++i) { // parse command line
- if (args[i] == null) {
- continue;
- } else if (args[i].equals("-count")) {
- count = Integer.parseInt(args[++i]);
- } else {
- // file is required parameter
- file = new Path(args[i]);
- }
+ String usage = "Usage: RCFile " + "[-count N]" + " file";
+ if (args.length == 0) {
+ LOG.error(usage);
+ System.exit(-1);
}
- if (file == null) {
- LOG.error(usage);
- System.exit(-1);
- }
+ try {
+ for (int i = 0; i < args.length; ++i) { // parse command line
+ if (args[i] == null) {
+ continue;
+ } else if (args[i].equals("-count")) {
+ count = Integer.parseInt(args[++i]);
+ } else {
+ // file is required parameter
+ file = new Path(args[i]);
+ }
+ }
- LOG.info("count = {}", count);
- LOG.info("create = {}", create);
- LOG.info("file = {}" ,file);
+ if (file == null) {
+ LOG.error(usage);
+ System.exit(-1);
+ }
- // test.performanceTest();
- LOG.info("Finished.");
- } finally {
- fs.close();
- }
+ LOG.info("count = {}", count);
+ LOG.info("create = {}", create);
+ LOG.info("file = {}", file);
+
+ // test.performanceTest();
+ LOG.info("Finished.");
+ } finally {
+ fs.close();
+ }
}
private static Properties createProperties() {
- Properties tbl = new Properties();
+ Properties tbl = new Properties();
- // Set the configuration parameters
- tbl.setProperty(Constants.SERIALIZATION_FORMAT, "9");
- tbl.setProperty("columns",
- "abyte,ashort,aint,along,adouble,astring,anullint,anullstring");
- tbl.setProperty("columns.types",
- "tinyint:smallint:int:bigint:double:string:int:string");
- tbl.setProperty(Constants.SERIALIZATION_NULL_FORMAT, "NULL");
- return tbl;
+ // Set the configuration parameters
+ tbl.setProperty(Constants.SERIALIZATION_FORMAT, "9");
+ tbl.setProperty("columns",
+ "abyte,ashort,aint,along,adouble,astring,anullint,anullstring");
+ tbl.setProperty("columns.types",
+ "tinyint:smallint:int:bigint:double:string:int:string");
+ tbl.setProperty(Constants.SERIALIZATION_NULL_FORMAT, "NULL");
+ return tbl;
}
-
public void testSynAndSplit() throws IOException, InterruptedException {
- splitBeforeSync();
- splitRightBeforeSync();
- splitInMiddleOfSync();
- splitRightAfterSync();
- splitAfterSync();
+ splitBeforeSync();
+ splitRightBeforeSync();
+ splitInMiddleOfSync();
+ splitRightAfterSync();
+ splitAfterSync();
}
- private void splitBeforeSync() throws IOException,InterruptedException {
- writeThenReadByRecordReader(600, 1000, 2, 17684, null);
+ private void splitBeforeSync() throws IOException, InterruptedException {
+ writeThenReadByRecordReader(600, 1000, 2, 17684, null);
}
- private void splitRightBeforeSync() throws IOException ,InterruptedException{
- writeThenReadByRecordReader(500, 1000, 2, 17750, null);
+ private void splitRightBeforeSync() throws IOException, InterruptedException {
+ writeThenReadByRecordReader(500, 1000, 2, 17750, null);
}
- private void splitInMiddleOfSync() throws IOException,InterruptedException {
- writeThenReadByRecordReader(500, 1000, 2, 17760, null);
+ private void splitInMiddleOfSync() throws IOException, InterruptedException {
+ writeThenReadByRecordReader(500, 1000, 2, 17760, null);
}
private void splitRightAfterSync() throws IOException, InterruptedException {
- writeThenReadByRecordReader(500, 1000, 2, 17770, null);
+ writeThenReadByRecordReader(500, 1000, 2, 17770, null);
}
- private void splitAfterSync() throws IOException ,InterruptedException{
- writeThenReadByRecordReader(500, 1000, 2, 19950, null);
+ private void splitAfterSync() throws IOException, InterruptedException {
+ writeThenReadByRecordReader(500, 1000, 2, 19950, null);
}
private void writeThenReadByRecordReader(int intervalRecordCount,
- int writeCount, int splitNumber, long maxSplitSize, CompressionCodec codec)
+ int writeCount, int splitNumber, long maxSplitSize, CompressionCodec codec)
throws IOException, InterruptedException {
- Path testDir = new Path(System.getProperty("test.data.dir", ".")
- + "/mapred/testsmallfirstsplit");
- Path testFile = new Path(testDir, "test_rcfile");
- fs.delete(testFile, true);
- Configuration cloneConf = new Configuration(conf);
- RCFileOutputFormat.setColumnNumber(cloneConf, bytesArray.length);
- cloneConf.setInt(RCFile.RECORD_INTERVAL_CONF_STR, intervalRecordCount);
+ Path testDir = new Path(System.getProperty("test.data.dir", ".")
+ + "/mapred/testsmallfirstsplit");
+ Path testFile = new Path(testDir, "test_rcfile");
+ fs.delete(testFile, true);
+ Configuration cloneConf = new Configuration(conf);
+ RCFileOutputFormat.setColumnNumber(cloneConf, bytesArray.length);
+ cloneConf.setInt(RCFile.RECORD_INTERVAL_CONF_STR, intervalRecordCount);
- RCFile.Writer writer = new RCFile.Writer(fs, cloneConf, testFile, null, codec);
+ RCFile.Writer writer = new RCFile.Writer(fs, cloneConf, testFile, null, codec);
- BytesRefArrayWritable bytes = new BytesRefArrayWritable(bytesArray.length);
- for (int i = 0; i < bytesArray.length; i++) {
- BytesRefWritable cu = null;
- cu = new BytesRefWritable(bytesArray[i], 0, bytesArray[i].length);
- bytes.set(i, cu);
- }
- for (int i = 0; i < writeCount; i++) {
- writer.append(bytes);
- }
- writer.close();
+ BytesRefArrayWritable bytes = new BytesRefArrayWritable(bytesArray.length);
+ for (int i = 0; i < bytesArray.length; i++) {
+ BytesRefWritable cu = null;
+ cu = new BytesRefWritable(bytesArray[i], 0, bytesArray[i].length);
+ bytes.set(i, cu);
+ }
+ for (int i = 0; i < writeCount; i++) {
+ writer.append(bytes);
+ }
+ writer.close();
- RCFileMapReduceInputFormat inputFormat = new RCFileMapReduceInputFormat();
- Configuration jonconf = new Configuration(cloneConf);
- jonconf.set("mapred.input.dir", testDir.toString());
- JobContext context = new Job(jonconf);
- context.getConfiguration().setLong("mapred.max.split.size",maxSplitSize);
- List splits = inputFormat.getSplits(context);
- assertEquals("splits length should be " + splitNumber, splits.size(), splitNumber);
- int readCount = 0;
- for (int i = 0; i < splits.size(); i++) {
- TaskAttemptContext tac = HCatHadoopShims.Instance.get().createTaskAttemptContext(jonconf, new TaskAttemptID());
- RecordReader rr = inputFormat.createRecordReader(splits.get(i), tac);
- rr.initialize(splits.get(i), tac);
- while (rr.nextKeyValue()) {
- readCount++;
+ RCFileMapReduceInputFormat inputFormat = new RCFileMapReduceInputFormat();
+ Configuration jonconf = new Configuration(cloneConf);
+ jonconf.set("mapred.input.dir", testDir.toString());
+ JobContext context = new Job(jonconf);
+ context.getConfiguration().setLong("mapred.max.split.size", maxSplitSize);
+ List splits = inputFormat.getSplits(context);
+ assertEquals("splits length should be " + splitNumber, splits.size(), splitNumber);
+ int readCount = 0;
+ for (int i = 0; i < splits.size(); i++) {
+ TaskAttemptContext tac = HCatHadoopShims.Instance.get().createTaskAttemptContext(jonconf, new TaskAttemptID());
+ RecordReader rr = inputFormat.createRecordReader(splits.get(i), tac);
+ rr.initialize(splits.get(i), tac);
+ while (rr.nextKeyValue()) {
+ readCount++;
+ }
}
- }
- assertEquals("readCount should be equal to writeCount", readCount, writeCount);
+ assertEquals("readCount should be equal to writeCount", readCount, writeCount);
}
- }
+}
Index: src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java
===================================================================
--- src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java (revision 1381792)
+++ src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java (working copy)
@@ -43,80 +43,80 @@
public class TestMsgBusConnection extends TestCase {
- private Driver driver;
- private BrokerService broker;
- private MessageConsumer consumer;
+ private Driver driver;
+ private BrokerService broker;
+ private MessageConsumer consumer;
- @Override
- protected void setUp() throws Exception {
+ @Override
+ protected void setUp() throws Exception {
- super.setUp();
- broker = new BrokerService();
- // configure the broker
- broker.addConnector("tcp://localhost:61616?broker.persistent=false");
+ super.setUp();
+ broker = new BrokerService();
+ // configure the broker
+ broker.addConnector("tcp://localhost:61616?broker.persistent=false");
- broker.start();
+ broker.start();
- System.setProperty("java.naming.factory.initial",
- "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
- System.setProperty("java.naming.provider.url", "tcp://localhost:61616");
- connectClient();
- HiveConf hiveConf = new HiveConf(this.getClass());
- hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname,
- NotificationListener.class.getName());
- hiveConf.set("hive.metastore.local", "true");
- hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
- hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
- hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
- hiveConf.set(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX, "planetlab.hcat");
- SessionState.start(new CliSessionState(hiveConf));
- driver = new Driver(hiveConf);
- }
+ System.setProperty("java.naming.factory.initial",
+ "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
+ System.setProperty("java.naming.provider.url", "tcp://localhost:61616");
+ connectClient();
+ HiveConf hiveConf = new HiveConf(this.getClass());
+ hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname,
+ NotificationListener.class.getName());
+ hiveConf.set("hive.metastore.local", "true");
+ hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+ hiveConf.set(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX, "planetlab.hcat");
+ SessionState.start(new CliSessionState(hiveConf));
+ driver = new Driver(hiveConf);
+ }
- private void connectClient() throws JMSException {
- ConnectionFactory connFac = new ActiveMQConnectionFactory(
- "tcp://localhost:61616");
- Connection conn = connFac.createConnection();
- conn.start();
- Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
- Destination hcatTopic = session.createTopic("planetlab.hcat");
- consumer = session.createConsumer(hcatTopic);
- }
+ private void connectClient() throws JMSException {
+ ConnectionFactory connFac = new ActiveMQConnectionFactory(
+ "tcp://localhost:61616");
+ Connection conn = connFac.createConnection();
+ conn.start();
+ Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ Destination hcatTopic = session.createTopic("planetlab.hcat");
+ consumer = session.createConsumer(hcatTopic);
+ }
- public void testConnection() throws Exception {
+ public void testConnection() throws Exception {
- try {
- driver.run("create database testconndb");
- Message msg = consumer.receive();
- assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT,
- msg.getStringProperty(HCatConstants.HCAT_EVENT));
- assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString());
- assertEquals("testconndb",
- ((Database) ((ObjectMessage) msg).getObject()).getName());
- broker.stop();
- driver.run("drop database testconndb cascade");
- broker.start(true);
- connectClient();
- driver.run("create database testconndb");
- msg = consumer.receive();
- assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT,
- msg.getStringProperty(HCatConstants.HCAT_EVENT));
- assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString());
- assertEquals("testconndb",
- ((Database) ((ObjectMessage) msg).getObject()).getName());
- driver.run("drop database testconndb cascade");
- msg = consumer.receive();
- assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT,
- msg.getStringProperty(HCatConstants.HCAT_EVENT));
- assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString());
- assertEquals("testconndb",
- ((Database) ((ObjectMessage) msg).getObject()).getName());
- } catch (NoSuchObjectException nsoe) {
- nsoe.printStackTrace(System.err);
- assert false;
- } catch (AlreadyExistsException aee) {
- aee.printStackTrace(System.err);
- assert false;
+ try {
+ driver.run("create database testconndb");
+ Message msg = consumer.receive();
+ assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT,
+ msg.getStringProperty(HCatConstants.HCAT_EVENT));
+ assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString());
+ assertEquals("testconndb",
+ ((Database) ((ObjectMessage) msg).getObject()).getName());
+ broker.stop();
+ driver.run("drop database testconndb cascade");
+ broker.start(true);
+ connectClient();
+ driver.run("create database testconndb");
+ msg = consumer.receive();
+ assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT,
+ msg.getStringProperty(HCatConstants.HCAT_EVENT));
+ assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString());
+ assertEquals("testconndb",
+ ((Database) ((ObjectMessage) msg).getObject()).getName());
+ driver.run("drop database testconndb cascade");
+ msg = consumer.receive();
+ assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT,
+ msg.getStringProperty(HCatConstants.HCAT_EVENT));
+ assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString());
+ assertEquals("testconndb",
+ ((Database) ((ObjectMessage) msg).getObject()).getName());
+ } catch (NoSuchObjectException nsoe) {
+ nsoe.printStackTrace(System.err);
+ assert false;
+ } catch (AlreadyExistsException aee) {
+ aee.printStackTrace(System.err);
+ assert false;
+ }
}
- }
}
Index: src/test/org/apache/hcatalog/listener/TestNotificationListener.java
===================================================================
--- src/test/org/apache/hcatalog/listener/TestNotificationListener.java (revision 1381792)
+++ src/test/org/apache/hcatalog/listener/TestNotificationListener.java (working copy)
@@ -59,136 +59,136 @@
import junit.framework.TestCase;
public class TestNotificationListener extends TestCase implements
- MessageListener {
+ MessageListener {
- private HiveConf hiveConf;
- private Driver driver;
- private AtomicInteger cntInvocation = new AtomicInteger(0);
+ private HiveConf hiveConf;
+ private Driver driver;
+ private AtomicInteger cntInvocation = new AtomicInteger(0);
- @Override
- protected void setUp() throws Exception {
+ @Override
+ protected void setUp() throws Exception {
- super.setUp();
- System.setProperty("java.naming.factory.initial",
- "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
- System.setProperty("java.naming.provider.url",
- "vm://localhost?broker.persistent=false");
- ConnectionFactory connFac = new ActiveMQConnectionFactory(
- "vm://localhost?broker.persistent=false");
- Connection conn = connFac.createConnection();
- conn.start();
- // We want message to be sent when session commits, thus we run in
- // transacted mode.
- Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
- Destination hcatTopic = session
- .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX);
- MessageConsumer consumer1 = session.createConsumer(hcatTopic);
- consumer1.setMessageListener(this);
- Destination tblTopic = session
- .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX + ".mydb.mytbl");
- MessageConsumer consumer2 = session.createConsumer(tblTopic);
- consumer2.setMessageListener(this);
- Destination dbTopic = session
- .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX + ".mydb");
- MessageConsumer consumer3 = session.createConsumer(dbTopic);
- consumer3.setMessageListener(this);
- hiveConf = new HiveConf(this.getClass());
- hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname,
- NotificationListener.class.getName());
- hiveConf.set("hive.metastore.local", "true");
- hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
- hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
- hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
- SessionState.start(new CliSessionState(hiveConf));
- driver = new Driver(hiveConf);
- }
+ super.setUp();
+ System.setProperty("java.naming.factory.initial",
+ "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
+ System.setProperty("java.naming.provider.url",
+ "vm://localhost?broker.persistent=false");
+ ConnectionFactory connFac = new ActiveMQConnectionFactory(
+ "vm://localhost?broker.persistent=false");
+ Connection conn = connFac.createConnection();
+ conn.start();
+ // We want message to be sent when session commits, thus we run in
+ // transacted mode.
+ Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ Destination hcatTopic = session
+ .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX);
+ MessageConsumer consumer1 = session.createConsumer(hcatTopic);
+ consumer1.setMessageListener(this);
+ Destination tblTopic = session
+ .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX + ".mydb.mytbl");
+ MessageConsumer consumer2 = session.createConsumer(tblTopic);
+ consumer2.setMessageListener(this);
+ Destination dbTopic = session
+ .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX + ".mydb");
+ MessageConsumer consumer3 = session.createConsumer(dbTopic);
+ consumer3.setMessageListener(this);
+ hiveConf = new HiveConf(this.getClass());
+ hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname,
+ NotificationListener.class.getName());
+ hiveConf.set("hive.metastore.local", "true");
+ hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+ SessionState.start(new CliSessionState(hiveConf));
+ driver = new Driver(hiveConf);
+ }
- @Override
- protected void tearDown() throws Exception {
- assertEquals(7, cntInvocation.get());
- super.tearDown();
- }
+ @Override
+ protected void tearDown() throws Exception {
+ assertEquals(7, cntInvocation.get());
+ super.tearDown();
+ }
- public void testAMQListener() throws MetaException, TException,
- UnknownTableException, NoSuchObjectException, CommandNeedRetryException,
- UnknownDBException, InvalidPartitionException, UnknownPartitionException {
- driver.run("create database mydb");
- driver.run("use mydb");
- driver.run("create table mytbl (a string) partitioned by (b string)");
- driver.run("alter table mytbl add partition(b='2011')");
- HiveMetaStoreClient msc = new HiveMetaStoreClient(hiveConf);
- Map kvs = new HashMap(1);
- kvs.put("b", "2011");
- msc.markPartitionForEvent("mydb", "mytbl", kvs,
- PartitionEventType.LOAD_DONE);
- driver.run("alter table mytbl drop partition(b='2011')");
- driver.run("drop table mytbl");
- driver.run("drop database mydb");
- }
+ public void testAMQListener() throws MetaException, TException,
+ UnknownTableException, NoSuchObjectException, CommandNeedRetryException,
+ UnknownDBException, InvalidPartitionException, UnknownPartitionException {
+ driver.run("create database mydb");
+ driver.run("use mydb");
+ driver.run("create table mytbl (a string) partitioned by (b string)");
+ driver.run("alter table mytbl add partition(b='2011')");
+ HiveMetaStoreClient msc = new HiveMetaStoreClient(hiveConf);
+ Map kvs = new HashMap(1);
+ kvs.put("b", "2011");
+ msc.markPartitionForEvent("mydb", "mytbl", kvs,
+ PartitionEventType.LOAD_DONE);
+ driver.run("alter table mytbl drop partition(b='2011')");
+ driver.run("drop table mytbl");
+ driver.run("drop database mydb");
+ }
- @Override
- public void onMessage(Message msg) {
- cntInvocation.incrementAndGet();
+ @Override
+ public void onMessage(Message msg) {
+ cntInvocation.incrementAndGet();
- String event;
- try {
- event = msg.getStringProperty(HCatConstants.HCAT_EVENT);
- if (event.equals(HCatConstants.HCAT_ADD_DATABASE_EVENT)) {
+ String event;
+ try {
+ event = msg.getStringProperty(HCatConstants.HCAT_EVENT);
+ if (event.equals(HCatConstants.HCAT_ADD_DATABASE_EVENT)) {
- assertEquals("topic://" + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg
- .getJMSDestination().toString());
- assertEquals("mydb",
- ((Database) ((ObjectMessage) msg).getObject()).getName());
- } else if (event.equals(HCatConstants.HCAT_ADD_TABLE_EVENT)) {
+ assertEquals("topic://" + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg
+ .getJMSDestination().toString());
+ assertEquals("mydb",
+ ((Database) ((ObjectMessage) msg).getObject()).getName());
+ } else if (event.equals(HCatConstants.HCAT_ADD_TABLE_EVENT)) {
- assertEquals("topic://hcat.mydb", msg.getJMSDestination().toString());
- Table tbl = (Table) (((ObjectMessage) msg).getObject());
- assertEquals("mytbl", tbl.getTableName());
- assertEquals("mydb", tbl.getDbName());
- assertEquals(1, tbl.getPartitionKeysSize());
- } else if (event.equals(HCatConstants.HCAT_ADD_PARTITION_EVENT)) {
+ assertEquals("topic://hcat.mydb", msg.getJMSDestination().toString());
+ Table tbl = (Table) (((ObjectMessage) msg).getObject());
+ assertEquals("mytbl", tbl.getTableName());
+ assertEquals("mydb", tbl.getDbName());
+ assertEquals(1, tbl.getPartitionKeysSize());
+ } else if (event.equals(HCatConstants.HCAT_ADD_PARTITION_EVENT)) {
- assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination()
- .toString());
- Partition part = (Partition) (((ObjectMessage) msg).getObject());
- assertEquals("mytbl", part.getTableName());
- assertEquals("mydb", part.getDbName());
- List vals = new ArrayList(1);
- vals.add("2011");
- assertEquals(vals, part.getValues());
- } else if (event.equals(HCatConstants.HCAT_DROP_PARTITION_EVENT)) {
+ assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination()
+ .toString());
+ Partition part = (Partition) (((ObjectMessage) msg).getObject());
+ assertEquals("mytbl", part.getTableName());
+ assertEquals("mydb", part.getDbName());
+ List vals = new ArrayList(1);
+ vals.add("2011");
+ assertEquals(vals, part.getValues());
+ } else if (event.equals(HCatConstants.HCAT_DROP_PARTITION_EVENT)) {
- assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination()
- .toString());
- Partition part = (Partition) (((ObjectMessage) msg).getObject());
- assertEquals("mytbl", part.getTableName());
- assertEquals("mydb", part.getDbName());
- List vals = new ArrayList(1);
- vals.add("2011");
- assertEquals(vals, part.getValues());
- } else if (event.equals(HCatConstants.HCAT_DROP_TABLE_EVENT)) {
+ assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination()
+ .toString());
+ Partition part = (Partition) (((ObjectMessage) msg).getObject());
+ assertEquals("mytbl", part.getTableName());
+ assertEquals("mydb", part.getDbName());
+ List vals = new ArrayList(1);
+ vals.add("2011");
+ assertEquals(vals, part.getValues());
+ } else if (event.equals(HCatConstants.HCAT_DROP_TABLE_EVENT)) {
- assertEquals("topic://hcat.mydb", msg.getJMSDestination().toString());
- Table tbl = (Table) (((ObjectMessage) msg).getObject());
- assertEquals("mytbl", tbl.getTableName());
- assertEquals("mydb", tbl.getDbName());
- assertEquals(1, tbl.getPartitionKeysSize());
- } else if (event.equals(HCatConstants.HCAT_DROP_DATABASE_EVENT)) {
+ assertEquals("topic://hcat.mydb", msg.getJMSDestination().toString());
+ Table tbl = (Table) (((ObjectMessage) msg).getObject());
+ assertEquals("mytbl", tbl.getTableName());
+ assertEquals("mydb", tbl.getDbName());
+ assertEquals(1, tbl.getPartitionKeysSize());
+ } else if (event.equals(HCatConstants.HCAT_DROP_DATABASE_EVENT)) {
- assertEquals("topic://" + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg
- .getJMSDestination().toString());
- assertEquals("mydb",
- ((Database) ((ObjectMessage) msg).getObject()).getName());
- } else if (event.equals(HCatConstants.HCAT_PARTITION_DONE_EVENT)) {
- assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination()
- .toString());
- MapMessage mapMsg = (MapMessage) msg;
- assert mapMsg.getString("b").equals("2011");
- } else
- assert false;
- } catch (JMSException e) {
- e.printStackTrace(System.err);
- assert false;
+ assertEquals("topic://" + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg
+ .getJMSDestination().toString());
+ assertEquals("mydb",
+ ((Database) ((ObjectMessage) msg).getObject()).getName());
+ } else if (event.equals(HCatConstants.HCAT_PARTITION_DONE_EVENT)) {
+ assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination()
+ .toString());
+ MapMessage mapMsg = (MapMessage) msg;
+ assert mapMsg.getString("b").equals("2011");
+ } else
+ assert false;
+ } catch (JMSException e) {
+ e.printStackTrace(System.err);
+ assert false;
+ }
}
- }
}
Index: src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
===================================================================
--- src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java (revision 1381792)
+++ src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java (working copy)
@@ -68,285 +68,284 @@
*/
public abstract class HCatMapReduceTest extends TestCase {
- private static final Logger LOG = LoggerFactory.getLogger(HCatMapReduceTest.class);
- protected String dbName = "default";
- protected String tableName = "testHCatMapReduceTable";
+ private static final Logger LOG = LoggerFactory.getLogger(HCatMapReduceTest.class);
+ protected String dbName = "default";
+ protected String tableName = "testHCatMapReduceTable";
- protected String inputFormat = RCFileInputFormat.class.getName();
- protected String outputFormat = RCFileOutputFormat.class.getName();
- protected String serdeClass = ColumnarSerDe.class.getName();
+ protected String inputFormat = RCFileInputFormat.class.getName();
+ protected String outputFormat = RCFileOutputFormat.class.getName();
+ protected String serdeClass = ColumnarSerDe.class.getName();
- private static List writeRecords = new ArrayList();
- private static List readRecords = new ArrayList();
+ private static List writeRecords = new ArrayList();
+ private static List readRecords = new ArrayList();
- protected abstract void initialize() throws Exception;
+ protected abstract void initialize() throws Exception;
- protected abstract List getPartitionKeys();
+ protected abstract List getPartitionKeys();
- protected abstract List getTableColumns();
+ protected abstract List getTableColumns();
- private HiveMetaStoreClient client;
- protected HiveConf hiveConf;
+ private HiveMetaStoreClient client;
+ protected HiveConf hiveConf;
- private FileSystem fs;
- private String thriftUri = null;
+ private FileSystem fs;
+ private String thriftUri = null;
- protected Driver driver;
+ protected Driver driver;
- @Override
- protected void setUp() throws Exception {
- hiveConf = new HiveConf(this.getClass());
+ @Override
+ protected void setUp() throws Exception {
+ hiveConf = new HiveConf(this.getClass());
- //The default org.apache.hadoop.hive.ql.hooks.PreExecutePrinter hook
- //is present only in the ql/test directory
- hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
- hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
- hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
- driver = new Driver(hiveConf);
- SessionState.start(new CliSessionState(hiveConf));
+ //The default org.apache.hadoop.hive.ql.hooks.PreExecutePrinter hook
+ //is present only in the ql/test directory
+ hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+ driver = new Driver(hiveConf);
+ SessionState.start(new CliSessionState(hiveConf));
- thriftUri = System.getenv("HCAT_METASTORE_URI");
+ thriftUri = System.getenv("HCAT_METASTORE_URI");
- if( thriftUri != null ) {
- LOG.info("Using URI {}", thriftUri);
+ if (thriftUri != null) {
+ LOG.info("Using URI {}", thriftUri);
- hiveConf.set("hive.metastore.local", "false");
- hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, thriftUri);
- }
+ hiveConf.set("hive.metastore.local", "false");
+ hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, thriftUri);
+ }
- fs = new LocalFileSystem();
- fs.initialize(fs.getWorkingDirectory().toUri(), new Configuration());
+ fs = new LocalFileSystem();
+ fs.initialize(fs.getWorkingDirectory().toUri(), new Configuration());
- initialize();
+ initialize();
- client = new HiveMetaStoreClient(hiveConf, null);
- initTable();
- }
+ client = new HiveMetaStoreClient(hiveConf, null);
+ initTable();
+ }
- @Override
- protected void tearDown() throws Exception {
- try {
- String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName;
+ @Override
+ protected void tearDown() throws Exception {
+ try {
+ String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName;
- client.dropTable(databaseName, tableName);
- } catch(Exception e) {
- e.printStackTrace();
- throw e;
+ client.dropTable(databaseName, tableName);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+
+ client.close();
}
- client.close();
- }
+ private void initTable() throws Exception {
+ String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName;
- private void initTable() throws Exception {
+ try {
+ client.dropTable(databaseName, tableName);
+ } catch (Exception e) {
+ } //can fail with NoSuchObjectException
- String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName;
- try {
- client.dropTable(databaseName, tableName);
- } catch(Exception e) {
- } //can fail with NoSuchObjectException
+ Table tbl = new Table();
+ tbl.setDbName(databaseName);
+ tbl.setTableName(tableName);
+ tbl.setTableType("MANAGED_TABLE");
+ StorageDescriptor sd = new StorageDescriptor();
+ sd.setCols(getTableColumns());
+ tbl.setPartitionKeys(getPartitionKeys());
- Table tbl = new Table();
- tbl.setDbName(databaseName);
- tbl.setTableName(tableName);
- tbl.setTableType("MANAGED_TABLE");
- StorageDescriptor sd = new StorageDescriptor();
+ tbl.setSd(sd);
- sd.setCols(getTableColumns());
- tbl.setPartitionKeys(getPartitionKeys());
+ sd.setBucketCols(new ArrayList(2));
+ sd.setSerdeInfo(new SerDeInfo());
+ sd.getSerdeInfo().setName(tbl.getTableName());
+ sd.getSerdeInfo().setParameters(new HashMap());
+ sd.getSerdeInfo().getParameters().put(
+ org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "1");
+ sd.getSerdeInfo().setSerializationLib(serdeClass);
+ sd.setInputFormat(inputFormat);
+ sd.setOutputFormat(outputFormat);
- tbl.setSd(sd);
+ Map tableParams = new HashMap();
+ tbl.setParameters(tableParams);
- sd.setBucketCols(new ArrayList(2));
- sd.setSerdeInfo(new SerDeInfo());
- sd.getSerdeInfo().setName(tbl.getTableName());
- sd.getSerdeInfo().setParameters(new HashMap());
- sd.getSerdeInfo().getParameters().put(
- org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "1");
- sd.getSerdeInfo().setSerializationLib(serdeClass);
- sd.setInputFormat(inputFormat);
- sd.setOutputFormat(outputFormat);
+ client.createTable(tbl);
+ }
- Map tableParams = new HashMap();
- tbl.setParameters(tableParams);
+ //Create test input file with specified number of rows
+ private void createInputFile(Path path, int rowCount) throws IOException {
- client.createTable(tbl);
- }
+ if (fs.exists(path)) {
+ fs.delete(path, true);
+ }
- //Create test input file with specified number of rows
- private void createInputFile(Path path, int rowCount) throws IOException {
+ FSDataOutputStream os = fs.create(path);
- if( fs.exists(path) ) {
- fs.delete(path, true);
- }
+ for (int i = 0; i < rowCount; i++) {
+ os.writeChars(i + "\n");
+ }
- FSDataOutputStream os = fs.create(path);
-
- for(int i = 0;i < rowCount;i++) {
- os.writeChars(i + "\n");
+ os.close();
}
- os.close();
- }
+ public static class MapCreate extends
+ Mapper {
- public static class MapCreate extends
- Mapper {
+ static int writeCount = 0; //test will be in local mode
- static int writeCount = 0; //test will be in local mode
+ @Override
+ public void map(LongWritable key, Text value, Context context
+ ) throws IOException, InterruptedException {
+ {
+ try {
+ HCatRecord rec = writeRecords.get(writeCount);
+ context.write(null, rec);
+ writeCount++;
- @Override
- public void map(LongWritable key, Text value, Context context
- ) throws IOException, InterruptedException {
- {
- try {
- HCatRecord rec = writeRecords.get(writeCount);
- context.write(null, rec);
- writeCount++;
+ } catch (Exception e) {
- }catch(Exception e) {
-
- e.printStackTrace(System.err); //print since otherwise exception is lost
- throw new IOException(e);
+ e.printStackTrace(System.err); //print since otherwise exception is lost
+ throw new IOException(e);
+ }
+ }
}
- }
}
- }
- public static class MapRead extends
- Mapper {
+ public static class MapRead extends
+ Mapper {
- static int readCount = 0; //test will be in local mode
+ static int readCount = 0; //test will be in local mode
- @Override
- public void map(WritableComparable key, HCatRecord value, Context context
- ) throws IOException, InterruptedException {
- {
- try {
- readRecords.add(value);
- readCount++;
- } catch(Exception e) {
- e.printStackTrace(); //print since otherwise exception is lost
- throw new IOException(e);
+ @Override
+ public void map(WritableComparable key, HCatRecord value, Context context
+ ) throws IOException, InterruptedException {
+ {
+ try {
+ readRecords.add(value);
+ readCount++;
+ } catch (Exception e) {
+ e.printStackTrace(); //print since otherwise exception is lost
+ throw new IOException(e);
+ }
+ }
}
- }
}
- }
- Job runMRCreate(Map partitionValues,
- List partitionColumns, List records,
- int writeCount, boolean assertWrite) throws Exception {
+ Job runMRCreate(Map partitionValues,
+ List partitionColumns, List records,
+ int writeCount, boolean assertWrite) throws Exception {
- writeRecords = records;
- MapCreate.writeCount = 0;
+ writeRecords = records;
+ MapCreate.writeCount = 0;
- Configuration conf = new Configuration();
- Job job = new Job(conf, "hcat mapreduce write test");
- job.setJarByClass(this.getClass());
- job.setMapperClass(HCatMapReduceTest.MapCreate.class);
+ Configuration conf = new Configuration();
+ Job job = new Job(conf, "hcat mapreduce write test");
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(HCatMapReduceTest.MapCreate.class);
- // input/output settings
- job.setInputFormatClass(TextInputFormat.class);
+ // input/output settings
+ job.setInputFormatClass(TextInputFormat.class);
- Path path = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceInput");
- createInputFile(path, writeCount);
+ Path path = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceInput");
+ createInputFile(path, writeCount);
- TextInputFormat.setInputPaths(job, path);
+ TextInputFormat.setInputPaths(job, path);
- job.setOutputFormatClass(HCatOutputFormat.class);
+ job.setOutputFormatClass(HCatOutputFormat.class);
- OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, tableName, partitionValues);
- HCatOutputFormat.setOutput(job, outputJobInfo);
+ OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, tableName, partitionValues);
+ HCatOutputFormat.setOutput(job, outputJobInfo);
- job.setMapOutputKeyClass(BytesWritable.class);
- job.setMapOutputValueClass(DefaultHCatRecord.class);
+ job.setMapOutputKeyClass(BytesWritable.class);
+ job.setMapOutputValueClass(DefaultHCatRecord.class);
- job.setNumReduceTasks(0);
+ job.setNumReduceTasks(0);
- HCatOutputFormat.setSchema(job, new HCatSchema(partitionColumns));
+ HCatOutputFormat.setSchema(job, new HCatSchema(partitionColumns));
- boolean success = job.waitForCompletion(true);
+ boolean success = job.waitForCompletion(true);
- // Ensure counters are set when data has actually been read.
- if (partitionValues != null) {
- assertTrue(job.getCounters().getGroup("FileSystemCounters")
- .findCounter("FILE_BYTES_READ").getValue() > 0);
- }
+ // Ensure counters are set when data has actually been read.
+ if (partitionValues != null) {
+ assertTrue(job.getCounters().getGroup("FileSystemCounters")
+ .findCounter("FILE_BYTES_READ").getValue() > 0);
+ }
- if (!HcatTestUtils.isHadoop23()) {
- // Local mode outputcommitter hook is not invoked in Hadoop 1.x
- if (success) {
- new FileOutputCommitterContainer(job,null).commitJob(job);
- } else {
- new FileOutputCommitterContainer(job,null).abortJob(job, JobStatus.State.FAILED);
+ if (!HcatTestUtils.isHadoop23()) {
+ // Local mode outputcommitter hook is not invoked in Hadoop 1.x
+ if (success) {
+ new FileOutputCommitterContainer(job, null).commitJob(job);
+ } else {
+ new FileOutputCommitterContainer(job, null).abortJob(job, JobStatus.State.FAILED);
+ }
}
+ if (assertWrite) {
+ // we assert only if we expected to assert with this call.
+ Assert.assertEquals(writeCount, MapCreate.writeCount);
+ }
+
+ return job;
}
- if (assertWrite){
- // we assert only if we expected to assert with this call.
- Assert.assertEquals(writeCount, MapCreate.writeCount);
+
+ List runMRRead(int readCount) throws Exception {
+ return runMRRead(readCount, null);
}
- return job;
- }
+ List runMRRead(int readCount, String filter) throws Exception {
- List runMRRead(int readCount) throws Exception {
- return runMRRead(readCount, null);
- }
+ MapRead.readCount = 0;
+ readRecords.clear();
- List runMRRead(int readCount, String filter) throws Exception {
+ Configuration conf = new Configuration();
+ Job job = new Job(conf, "hcat mapreduce read test");
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(HCatMapReduceTest.MapRead.class);
- MapRead.readCount = 0;
- readRecords.clear();
+ // input/output settings
+ job.setInputFormatClass(HCatInputFormat.class);
+ job.setOutputFormatClass(TextOutputFormat.class);
- Configuration conf = new Configuration();
- Job job = new Job(conf, "hcat mapreduce read test");
- job.setJarByClass(this.getClass());
- job.setMapperClass(HCatMapReduceTest.MapRead.class);
+ InputJobInfo inputJobInfo = InputJobInfo.create(dbName, tableName, filter);
+ HCatInputFormat.setInput(job, inputJobInfo);
- // input/output settings
- job.setInputFormatClass(HCatInputFormat.class);
- job.setOutputFormatClass(TextOutputFormat.class);
+ job.setMapOutputKeyClass(BytesWritable.class);
+ job.setMapOutputValueClass(Text.class);
- InputJobInfo inputJobInfo = InputJobInfo.create(dbName,tableName,filter);
- HCatInputFormat.setInput(job, inputJobInfo);
+ job.setNumReduceTasks(0);
- job.setMapOutputKeyClass(BytesWritable.class);
- job.setMapOutputValueClass(Text.class);
+ Path path = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceOutput");
+ if (fs.exists(path)) {
+ fs.delete(path, true);
+ }
- job.setNumReduceTasks(0);
+ TextOutputFormat.setOutputPath(job, path);
- Path path = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceOutput");
- if( fs.exists(path) ) {
- fs.delete(path, true);
+ job.waitForCompletion(true);
+ Assert.assertEquals(readCount, MapRead.readCount);
+
+ return readRecords;
}
- TextOutputFormat.setOutputPath(job, path);
- job.waitForCompletion(true);
- Assert.assertEquals(readCount, MapRead.readCount);
+ protected HCatSchema getTableSchema() throws Exception {
- return readRecords;
- }
+ Configuration conf = new Configuration();
+ Job job = new Job(conf, "hcat mapreduce read schema test");
+ job.setJarByClass(this.getClass());
+ // input/output settings
+ job.setInputFormatClass(HCatInputFormat.class);
+ job.setOutputFormatClass(TextOutputFormat.class);
- protected HCatSchema getTableSchema() throws Exception {
+ InputJobInfo inputJobInfo = InputJobInfo.create(dbName, tableName, null);
+ HCatInputFormat.setInput(job, inputJobInfo);
- Configuration conf = new Configuration();
- Job job = new Job(conf, "hcat mapreduce read schema test");
- job.setJarByClass(this.getClass());
+ return HCatInputFormat.getTableSchema(job);
+ }
- // input/output settings
- job.setInputFormatClass(HCatInputFormat.class);
- job.setOutputFormatClass(TextOutputFormat.class);
-
- InputJobInfo inputJobInfo = InputJobInfo.create(dbName,tableName,null);
- HCatInputFormat.setInput(job, inputJobInfo);
-
- return HCatInputFormat.getTableSchema(job);
- }
-
}
Index: src/test/org/apache/hcatalog/mapreduce/TestHCatHiveThriftCompatibility.java
===================================================================
--- src/test/org/apache/hcatalog/mapreduce/TestHCatHiveThriftCompatibility.java (revision 1381792)
+++ src/test/org/apache/hcatalog/mapreduce/TestHCatHiveThriftCompatibility.java (working copy)
@@ -39,77 +39,77 @@
public class TestHCatHiveThriftCompatibility extends HCatBaseTest {
- private boolean setUpComplete = false;
- private Path intStringSeq;
+ private boolean setUpComplete = false;
+ private Path intStringSeq;
- @Before
- @Override
- public void setUp() throws Exception {
- super.setUp();
- if (setUpComplete) {
- return;
- }
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ if (setUpComplete) {
+ return;
+ }
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- TIOStreamTransport transport = new TIOStreamTransport(out);
- TBinaryProtocol protocol = new TBinaryProtocol(transport);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ TIOStreamTransport transport = new TIOStreamTransport(out);
+ TBinaryProtocol protocol = new TBinaryProtocol(transport);
- IntString intString = new IntString(1, "one", 1);
- intString.write(protocol);
- BytesWritable bytesWritable = new BytesWritable(out.toByteArray());
+ IntString intString = new IntString(1, "one", 1);
+ intString.write(protocol);
+ BytesWritable bytesWritable = new BytesWritable(out.toByteArray());
- intStringSeq = new Path(TEST_DATA_DIR + "/data/intString.seq");
- LOG.info("Creating data file: " + intStringSeq);
+ intStringSeq = new Path(TEST_DATA_DIR + "/data/intString.seq");
+ LOG.info("Creating data file: " + intStringSeq);
- SequenceFile.Writer seqFileWriter = SequenceFile.createWriter(
- intStringSeq.getFileSystem(hiveConf), hiveConf, intStringSeq,
- NullWritable.class, BytesWritable.class);
- seqFileWriter.append(NullWritable.get(), bytesWritable);
- seqFileWriter.close();
+ SequenceFile.Writer seqFileWriter = SequenceFile.createWriter(
+ intStringSeq.getFileSystem(hiveConf), hiveConf, intStringSeq,
+ NullWritable.class, BytesWritable.class);
+ seqFileWriter.append(NullWritable.get(), bytesWritable);
+ seqFileWriter.close();
- setUpComplete = true;
- }
+ setUpComplete = true;
+ }
- /**
- * Create a table with no explicit schema and ensure its correctly
- * discovered from the thrift struct.
- */
- @Test
- public void testDynamicCols() throws Exception {
- Assert.assertEquals(0, driver.run("drop table if exists test_thrift").getResponseCode());
- Assert.assertEquals(0, driver.run(
- "create external table test_thrift " +
- "partitioned by (year string) " +
- "row format serde 'org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer' " +
- "with serdeproperties ( " +
- " 'serialization.class'='org.apache.hadoop.hive.serde2.thrift.test.IntString', " +
- " 'serialization.format'='org.apache.thrift.protocol.TBinaryProtocol') " +
- "stored as" +
- " inputformat 'org.apache.hadoop.mapred.SequenceFileInputFormat'" +
- " outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'")
- .getResponseCode());
- Assert.assertEquals(0,
- driver.run("alter table test_thrift add partition (year = '2012') location '" +
- intStringSeq.getParent() + "'").getResponseCode());
+ /**
+ * Create a table with no explicit schema and ensure its correctly
+ * discovered from the thrift struct.
+ */
+ @Test
+ public void testDynamicCols() throws Exception {
+ Assert.assertEquals(0, driver.run("drop table if exists test_thrift").getResponseCode());
+ Assert.assertEquals(0, driver.run(
+ "create external table test_thrift " +
+ "partitioned by (year string) " +
+ "row format serde 'org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer' " +
+ "with serdeproperties ( " +
+ " 'serialization.class'='org.apache.hadoop.hive.serde2.thrift.test.IntString', " +
+ " 'serialization.format'='org.apache.thrift.protocol.TBinaryProtocol') " +
+ "stored as" +
+ " inputformat 'org.apache.hadoop.mapred.SequenceFileInputFormat'" +
+ " outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'")
+ .getResponseCode());
+ Assert.assertEquals(0,
+ driver.run("alter table test_thrift add partition (year = '2012') location '" +
+ intStringSeq.getParent() + "'").getResponseCode());
- PigServer pigServer = new PigServer(ExecType.LOCAL);
- pigServer.registerQuery("A = load 'test_thrift' using org.apache.hcatalog.pig.HCatLoader();");
+ PigServer pigServer = new PigServer(ExecType.LOCAL);
+ pigServer.registerQuery("A = load 'test_thrift' using org.apache.hcatalog.pig.HCatLoader();");
- Schema expectedSchema = new Schema();
- expectedSchema.add(new Schema.FieldSchema("myint", DataType.INTEGER));
- expectedSchema.add(new Schema.FieldSchema("mystring", DataType.CHARARRAY));
- expectedSchema.add(new Schema.FieldSchema("underscore_int", DataType.INTEGER));
- expectedSchema.add(new Schema.FieldSchema("year", DataType.CHARARRAY));
+ Schema expectedSchema = new Schema();
+ expectedSchema.add(new Schema.FieldSchema("myint", DataType.INTEGER));
+ expectedSchema.add(new Schema.FieldSchema("mystring", DataType.CHARARRAY));
+ expectedSchema.add(new Schema.FieldSchema("underscore_int", DataType.INTEGER));
+ expectedSchema.add(new Schema.FieldSchema("year", DataType.CHARARRAY));
- Assert.assertEquals(expectedSchema, pigServer.dumpSchema("A"));
+ Assert.assertEquals(expectedSchema, pigServer.dumpSchema("A"));
- Iterator iterator = pigServer.openIterator("A");
- Tuple t = iterator.next();
- Assert.assertEquals(1, t.get(0));
- Assert.assertEquals("one", t.get(1));
- Assert.assertEquals(1, t.get(2));
- Assert.assertEquals("2012", t.get(3));
+ Iterator iterator = pigServer.openIterator("A");
+ Tuple t = iterator.next();
+ Assert.assertEquals(1, t.get(0));
+ Assert.assertEquals("one", t.get(1));
+ Assert.assertEquals(1, t.get(2));
+ Assert.assertEquals("2012", t.get(3));
- Assert.assertFalse(iterator.hasNext());
- }
+ Assert.assertFalse(iterator.hasNext());
+ }
}
Index: src/test/org/apache/hcatalog/mapreduce/TestHCatInputFormat.java
===================================================================
--- src/test/org/apache/hcatalog/mapreduce/TestHCatInputFormat.java (revision 1381792)
+++ src/test/org/apache/hcatalog/mapreduce/TestHCatInputFormat.java (working copy)
@@ -41,107 +41,107 @@
public class TestHCatInputFormat extends HCatBaseTest {
- private boolean setUpComplete = false;
+ private boolean setUpComplete = false;
- /**
- * Create an input sequence file with 100 records; every 10th record is bad.
- * Load this table into Hive.
- */
- @Before
- @Override
- public void setUp() throws Exception {
- super.setUp();
- if (setUpComplete) {
- return;
- }
+ /**
+ * Create an input sequence file with 100 records; every 10th record is bad.
+ * Load this table into Hive.
+ */
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ if (setUpComplete) {
+ return;
+ }
- Path intStringSeq = new Path(TEST_DATA_DIR + "/data/intString.seq");
- LOG.info("Creating data file: " + intStringSeq);
- SequenceFile.Writer seqFileWriter = SequenceFile.createWriter(
- intStringSeq.getFileSystem(hiveConf), hiveConf, intStringSeq,
- NullWritable.class, BytesWritable.class);
+ Path intStringSeq = new Path(TEST_DATA_DIR + "/data/intString.seq");
+ LOG.info("Creating data file: " + intStringSeq);
+ SequenceFile.Writer seqFileWriter = SequenceFile.createWriter(
+ intStringSeq.getFileSystem(hiveConf), hiveConf, intStringSeq,
+ NullWritable.class, BytesWritable.class);
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- TIOStreamTransport transport = new TIOStreamTransport(out);
- TBinaryProtocol protocol = new TBinaryProtocol(transport);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ TIOStreamTransport transport = new TIOStreamTransport(out);
+ TBinaryProtocol protocol = new TBinaryProtocol(transport);
- for (int i = 1; i <= 100; i++) {
- if (i % 10 == 0) {
- seqFileWriter.append(NullWritable.get(), new BytesWritable("bad record".getBytes()));
- } else {
- out.reset();
- IntString intString = new IntString(i, Integer.toString(i), i);
- intString.write(protocol);
- BytesWritable bytesWritable = new BytesWritable(out.toByteArray());
- seqFileWriter.append(NullWritable.get(), bytesWritable);
- }
- }
+ for (int i = 1; i <= 100; i++) {
+ if (i % 10 == 0) {
+ seqFileWriter.append(NullWritable.get(), new BytesWritable("bad record".getBytes()));
+ } else {
+ out.reset();
+ IntString intString = new IntString(i, Integer.toString(i), i);
+ intString.write(protocol);
+ BytesWritable bytesWritable = new BytesWritable(out.toByteArray());
+ seqFileWriter.append(NullWritable.get(), bytesWritable);
+ }
+ }
- seqFileWriter.close();
+ seqFileWriter.close();
- // Now let's load this file into a new Hive table.
- Assert.assertEquals(0, driver.run("drop table if exists test_bad_records").getResponseCode());
- Assert.assertEquals(0, driver.run(
- "create table test_bad_records " +
- "row format serde 'org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer' " +
- "with serdeproperties ( " +
- " 'serialization.class'='org.apache.hadoop.hive.serde2.thrift.test.IntString', " +
- " 'serialization.format'='org.apache.thrift.protocol.TBinaryProtocol') " +
- "stored as" +
- " inputformat 'org.apache.hadoop.mapred.SequenceFileInputFormat'" +
- " outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'")
- .getResponseCode());
- Assert.assertEquals(0, driver.run("load data local inpath '" + intStringSeq.getParent() +
- "' into table test_bad_records").getResponseCode());
+ // Now let's load this file into a new Hive table.
+ Assert.assertEquals(0, driver.run("drop table if exists test_bad_records").getResponseCode());
+ Assert.assertEquals(0, driver.run(
+ "create table test_bad_records " +
+ "row format serde 'org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer' " +
+ "with serdeproperties ( " +
+ " 'serialization.class'='org.apache.hadoop.hive.serde2.thrift.test.IntString', " +
+ " 'serialization.format'='org.apache.thrift.protocol.TBinaryProtocol') " +
+ "stored as" +
+ " inputformat 'org.apache.hadoop.mapred.SequenceFileInputFormat'" +
+ " outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'")
+ .getResponseCode());
+ Assert.assertEquals(0, driver.run("load data local inpath '" + intStringSeq.getParent() +
+ "' into table test_bad_records").getResponseCode());
- setUpComplete = true;
- }
+ setUpComplete = true;
+ }
- @Test
- public void testBadRecordHandlingPasses() throws Exception {
- Assert.assertTrue(runJob(0.1f));
- }
+ @Test
+ public void testBadRecordHandlingPasses() throws Exception {
+ Assert.assertTrue(runJob(0.1f));
+ }
- @Test
- public void testBadRecordHandlingFails() throws Exception {
- Assert.assertFalse(runJob(0.01f));
- }
+ @Test
+ public void testBadRecordHandlingFails() throws Exception {
+ Assert.assertFalse(runJob(0.01f));
+ }
- private boolean runJob(float badRecordThreshold) throws Exception {
- Configuration conf = new Configuration();
+ private boolean runJob(float badRecordThreshold) throws Exception {
+ Configuration conf = new Configuration();
- conf.setFloat(HCatConstants.HCAT_INPUT_BAD_RECORD_THRESHOLD_KEY, badRecordThreshold);
+ conf.setFloat(HCatConstants.HCAT_INPUT_BAD_RECORD_THRESHOLD_KEY, badRecordThreshold);
- Job job = new Job(conf);
- job.setJarByClass(this.getClass());
- job.setMapperClass(MyMapper.class);
+ Job job = new Job(conf);
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(MyMapper.class);
- job.setInputFormatClass(HCatInputFormat.class);
- job.setOutputFormatClass(TextOutputFormat.class);
+ job.setInputFormatClass(HCatInputFormat.class);
+ job.setOutputFormatClass(TextOutputFormat.class);
- HCatInputFormat.setInput(job, InputJobInfo.create("default", "test_bad_records", null));
+ HCatInputFormat.setInput(job, InputJobInfo.create("default", "test_bad_records", null));
- job.setMapOutputKeyClass(HCatRecord.class);
- job.setMapOutputValueClass(HCatRecord.class);
+ job.setMapOutputKeyClass(HCatRecord.class);
+ job.setMapOutputValueClass(HCatRecord.class);
- job.setNumReduceTasks(0);
+ job.setNumReduceTasks(0);
- Path path = new Path(TEST_DATA_DIR, "test_bad_record_handling_output");
- if (path.getFileSystem(conf).exists(path)) {
- path.getFileSystem(conf).delete(path, true);
- }
+ Path path = new Path(TEST_DATA_DIR, "test_bad_record_handling_output");
+ if (path.getFileSystem(conf).exists(path)) {
+ path.getFileSystem(conf).delete(path, true);
+ }
- TextOutputFormat.setOutputPath(job, path);
+ TextOutputFormat.setOutputPath(job, path);
- return job.waitForCompletion(true);
- }
+ return job.waitForCompletion(true);
+ }
- public static class MyMapper extends Mapper {
- @Override
- public void map(NullWritable key, HCatRecord value, Context context)
- throws IOException, InterruptedException {
- LOG.info("HCatRecord: " + value);
- context.write(NullWritable.get(), new Text(value.toString()));
+ public static class MyMapper extends Mapper {
+ @Override
+ public void map(NullWritable key, HCatRecord value, Context context)
+ throws IOException, InterruptedException {
+ LOG.info("HCatRecord: " + value);
+ context.write(NullWritable.get(), new Text(value.toString()));
+ }
}
- }
}
Index: src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java
===================================================================
--- src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java (revision 1381792)
+++ src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java (working copy)
@@ -46,120 +46,122 @@
public class TestHCatOutputFormat extends TestCase {
- private static final Logger LOG = LoggerFactory.getLogger(TestHCatOutputFormat.class);
- private HiveMetaStoreClient client;
- private HiveConf hiveConf;
+ private static final Logger LOG = LoggerFactory.getLogger(TestHCatOutputFormat.class);
+ private HiveMetaStoreClient client;
+ private HiveConf hiveConf;
- private static final String dbName = "hcatOutputFormatTestDB";
- private static final String tblName = "hcatOutputFormatTestTable";
+ private static final String dbName = "hcatOutputFormatTestDB";
+ private static final String tblName = "hcatOutputFormatTestTable";
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- hiveConf = new HiveConf(this.getClass());
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ hiveConf = new HiveConf(this.getClass());
- try {
- client = new HiveMetaStoreClient(hiveConf, null);
+ try {
+ client = new HiveMetaStoreClient(hiveConf, null);
- initTable();
- } catch (Throwable e) {
- LOG.error("Unable to open the metastore", e);
- throw new Exception(e);
+ initTable();
+ } catch (Throwable e) {
+ LOG.error("Unable to open the metastore", e);
+ throw new Exception(e);
+ }
}
- }
- @Override
- protected void tearDown() throws Exception {
- try {
- super.tearDown();
- client.dropTable(dbName, tblName);
- client.dropDatabase(dbName);
+ @Override
+ protected void tearDown() throws Exception {
+ try {
+ super.tearDown();
+ client.dropTable(dbName, tblName);
+ client.dropDatabase(dbName);
- client.close();
- } catch (Throwable e) {
- LOG.error("Unable to close metastore", e);
- throw new Exception(e);
+ client.close();
+ } catch (Throwable e) {
+ LOG.error("Unable to close metastore", e);
+ throw new Exception(e);
+ }
}
- }
- private void initTable() throws Exception {
+ private void initTable() throws Exception {
- try {
- client.dropTable(dbName, tblName);
- } catch(Exception e) {}
- try {
- client.dropDatabase(dbName);
- } catch(Exception e) {}
- client.createDatabase(new Database(dbName, "", null,null));
- assertNotNull((client.getDatabase(dbName).getLocationUri()));
+ try {
+ client.dropTable(dbName, tblName);
+ } catch (Exception e) {
+ }
+ try {
+ client.dropDatabase(dbName);
+ } catch (Exception e) {
+ }
+ client.createDatabase(new Database(dbName, "", null, null));
+ assertNotNull((client.getDatabase(dbName).getLocationUri()));
- List fields = new ArrayList();
- fields.add(new FieldSchema("colname", Constants.STRING_TYPE_NAME, ""));
+ List fields = new ArrayList();
+ fields.add(new FieldSchema("colname", Constants.STRING_TYPE_NAME, ""));
- Table tbl = new Table();
- tbl.setDbName(dbName);
- tbl.setTableName(tblName);
- StorageDescriptor sd = new StorageDescriptor();
- sd.setCols(fields);
- tbl.setSd(sd);
+ Table tbl = new Table();
+ tbl.setDbName(dbName);
+ tbl.setTableName(tblName);
+ StorageDescriptor sd = new StorageDescriptor();
+ sd.setCols(fields);
+ tbl.setSd(sd);
- //sd.setLocation("hdfs://tmp");
- sd.setInputFormat(RCFileInputFormat.class.getName());
- sd.setOutputFormat(RCFileOutputFormat.class.getName());
- sd.setParameters(new HashMap());
- sd.getParameters().put("test_param_1", "Use this for comments etc");
- //sd.setBucketCols(new ArrayList(2));
- //sd.getBucketCols().add("name");
- sd.setSerdeInfo(new SerDeInfo());
- sd.getSerdeInfo().setName(tbl.getTableName());
- sd.getSerdeInfo().setParameters(new HashMap());
- sd.getSerdeInfo().getParameters().put(
- org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "1");
- sd.getSerdeInfo().setSerializationLib(
- org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName());
- tbl.setPartitionKeys(fields);
+ //sd.setLocation("hdfs://tmp");
+ sd.setInputFormat(RCFileInputFormat.class.getName());
+ sd.setOutputFormat(RCFileOutputFormat.class.getName());
+ sd.setParameters(new HashMap());
+ sd.getParameters().put("test_param_1", "Use this for comments etc");
+ //sd.setBucketCols(new ArrayList(2));
+ //sd.getBucketCols().add("name");
+ sd.setSerdeInfo(new SerDeInfo());
+ sd.getSerdeInfo().setName(tbl.getTableName());
+ sd.getSerdeInfo().setParameters(new HashMap());
+ sd.getSerdeInfo().getParameters().put(
+ org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "1");
+ sd.getSerdeInfo().setSerializationLib(
+ org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName());
+ tbl.setPartitionKeys(fields);
- Map tableParams = new HashMap();
- tableParams.put("hcat.testarg", "testArgValue");
+ Map tableParams = new HashMap();
+ tableParams.put("hcat.testarg", "testArgValue");
- tbl.setParameters(tableParams);
+ tbl.setParameters(tableParams);
- client.createTable(tbl);
- Path tblPath = new Path(client.getTable(dbName, tblName).getSd().getLocation());
- assertTrue(tblPath.getFileSystem(hiveConf).mkdirs(new Path(tblPath,"colname=p1")));
+ client.createTable(tbl);
+ Path tblPath = new Path(client.getTable(dbName, tblName).getSd().getLocation());
+ assertTrue(tblPath.getFileSystem(hiveConf).mkdirs(new Path(tblPath, "colname=p1")));
- }
+ }
- public void testSetOutput() throws Exception {
- Configuration conf = new Configuration();
- Job job = new Job(conf, "test outputformat");
+ public void testSetOutput() throws Exception {
+ Configuration conf = new Configuration();
+ Job job = new Job(conf, "test outputformat");
- Map partitionValues = new HashMap();
- partitionValues.put("colname", "p1");
- //null server url means local mode
- OutputJobInfo info = OutputJobInfo.create(dbName, tblName, partitionValues);
+ Map partitionValues = new HashMap();
+ partitionValues.put("colname", "p1");
+ //null server url means local mode
+ OutputJobInfo info = OutputJobInfo.create(dbName, tblName, partitionValues);
- HCatOutputFormat.setOutput(job, info);
- OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(job);
+ HCatOutputFormat.setOutput(job, info);
+ OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(job);
- assertNotNull(jobInfo.getTableInfo());
- assertEquals(1, jobInfo.getPartitionValues().size());
- assertEquals("p1", jobInfo.getPartitionValues().get("colname"));
- assertEquals(1, jobInfo.getTableInfo().getDataColumns().getFields().size());
- assertEquals("colname", jobInfo.getTableInfo().getDataColumns().getFields().get(0).getName());
+ assertNotNull(jobInfo.getTableInfo());
+ assertEquals(1, jobInfo.getPartitionValues().size());
+ assertEquals("p1", jobInfo.getPartitionValues().get("colname"));
+ assertEquals(1, jobInfo.getTableInfo().getDataColumns().getFields().size());
+ assertEquals("colname", jobInfo.getTableInfo().getDataColumns().getFields().get(0).getName());
- publishTest(job);
- }
+ publishTest(job);
+ }
- public void publishTest(Job job) throws Exception {
- OutputCommitter committer = new FileOutputCommitterContainer(job,null);
- committer.cleanupJob(job);
+ public void publishTest(Job job) throws Exception {
+ OutputCommitter committer = new FileOutputCommitterContainer(job, null);
+ committer.cleanupJob(job);
- Partition part = client.getPartition(dbName, tblName, Arrays.asList("p1"));
- assertNotNull(part);
+ Partition part = client.getPartition(dbName, tblName, Arrays.asList("p1"));
+ assertNotNull(part);
- StorerInfo storer = InternalUtil.extractStorerInfo(part.getSd(),part.getParameters());
- assertEquals(storer.getProperties().get("hcat.testarg"), "testArgValue");
- assertTrue(part.getSd().getLocation().indexOf("p1") != -1);
- }
+ StorerInfo storer = InternalUtil.extractStorerInfo(part.getSd(), part.getParameters());
+ assertEquals(storer.getProperties().get("hcat.testarg"), "testArgValue");
+ assertTrue(part.getSd().getLocation().indexOf("p1") != -1);
+ }
}
Index: src/test/org/apache/hcatalog/mapreduce/HCatBaseTest.java
===================================================================
--- src/test/org/apache/hcatalog/mapreduce/HCatBaseTest.java (revision 1381792)
+++ src/test/org/apache/hcatalog/mapreduce/HCatBaseTest.java (working copy)
@@ -38,41 +38,41 @@
* Simplify writing HCatalog tests that require a HiveMetaStore.
*/
public class HCatBaseTest {
- protected static final Logger LOG = LoggerFactory.getLogger(HCatBaseTest.class);
- protected static final String TEST_DATA_DIR = System.getProperty("user.dir") +
- "/build/test/data/" + HCatBaseTest.class.getCanonicalName();
- protected static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
+ protected static final Logger LOG = LoggerFactory.getLogger(HCatBaseTest.class);
+ protected static final String TEST_DATA_DIR = System.getProperty("user.dir") +
+ "/build/test/data/" + HCatBaseTest.class.getCanonicalName();
+ protected static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
- protected HiveConf hiveConf = null;
- protected Driver driver = null;
- protected HiveMetaStoreClient client = null;
+ protected HiveConf hiveConf = null;
+ protected Driver driver = null;
+ protected HiveMetaStoreClient client = null;
- @BeforeClass
- public static void setUpTestDataDir() throws Exception {
- LOG.info("Using warehouse directory " + TEST_WAREHOUSE_DIR);
- File f = new File(TEST_WAREHOUSE_DIR);
- if (f.exists()) {
- FileUtil.fullyDelete(f);
+ @BeforeClass
+ public static void setUpTestDataDir() throws Exception {
+ LOG.info("Using warehouse directory " + TEST_WAREHOUSE_DIR);
+ File f = new File(TEST_WAREHOUSE_DIR);
+ if (f.exists()) {
+ FileUtil.fullyDelete(f);
+ }
+ Assert.assertTrue(new File(TEST_WAREHOUSE_DIR).mkdirs());
}
- Assert.assertTrue(new File(TEST_WAREHOUSE_DIR).mkdirs());
- }
- @Before
- public void setUp() throws Exception {
- if (driver == null) {
- hiveConf = new HiveConf(this.getClass());
- hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
- hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
- hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
- hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR);
- driver = new Driver(hiveConf);
- client = new HiveMetaStoreClient(hiveConf);
- SessionState.start(new CliSessionState(hiveConf));
+ @Before
+ public void setUp() throws Exception {
+ if (driver == null) {
+ hiveConf = new HiveConf(this.getClass());
+ hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+ hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR);
+ driver = new Driver(hiveConf);
+ client = new HiveMetaStoreClient(hiveConf);
+ SessionState.start(new CliSessionState(hiveConf));
+ }
}
- }
- protected void logAndRegister(PigServer server, String query) throws IOException {
- LOG.info("Registering pig query: " + query);
- server.registerQuery(query);
- }
+ protected void logAndRegister(PigServer server, String query) throws IOException {
+ LOG.info("Registering pig query: " + query);
+ server.registerQuery(query);
+ }
}
Index: src/test/org/apache/hcatalog/mapreduce/TestSequenceFileReadWrite.java
===================================================================
--- src/test/org/apache/hcatalog/mapreduce/TestSequenceFileReadWrite.java (revision 1381792)
+++ src/test/org/apache/hcatalog/mapreduce/TestSequenceFileReadWrite.java (working copy)
@@ -53,10 +53,10 @@
import org.junit.Test;
public class TestSequenceFileReadWrite extends TestCase {
- private static final String TEST_DATA_DIR = System.getProperty("user.dir") +
- "/build/test/data/" + TestSequenceFileReadWrite.class.getCanonicalName();
- private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
- private static final String INPUT_FILE_NAME = TEST_DATA_DIR + "/input.data";
+ private static final String TEST_DATA_DIR = System.getProperty("user.dir") +
+ "/build/test/data/" + TestSequenceFileReadWrite.class.getCanonicalName();
+ private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
+ private static final String INPUT_FILE_NAME = TEST_DATA_DIR + "/input.data";
private static Driver driver;
private static PigServer server;
@@ -86,7 +86,7 @@
}
@Test
- public void testSequenceTableWriteRead() throws Exception{
+ public void testSequenceTableWriteRead() throws Exception {
Initialize();
String createTable = "CREATE TABLE demo_table(a0 int, a1 String, a2 String) STORED AS SEQUENCEFILE";
driver.run("drop table demo_table");
@@ -112,10 +112,10 @@
numTuplesRead++;
}
assertEquals(input.length, numTuplesRead);
- }
+ }
@Test
- public void testTextTableWriteRead() throws Exception{
+ public void testTextTableWriteRead() throws Exception {
Initialize();
String createTable = "CREATE TABLE demo_table_1(a0 int, a1 String, a2 String) STORED AS TEXTFILE";
driver.run("drop table demo_table_1");
@@ -144,7 +144,7 @@
}
@Test
- public void testSequenceTableWriteReadMR() throws Exception{
+ public void testSequenceTableWriteReadMR() throws Exception {
Initialize();
String createTable = "CREATE TABLE demo_table_2(a0 int, a1 String, a2 String) STORED AS SEQUENCEFILE";
driver.run("drop table demo_table_2");
@@ -238,27 +238,27 @@
}
- public static class Map extends Mapper{
+ public static class Map extends Mapper {
- public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
- String[] cols = value.toString().split(",");
- DefaultHCatRecord record = new DefaultHCatRecord(3);
- record.set(0,Integer.parseInt(cols[0]));
- record.set(1,cols[1]);
- record.set(2,cols[2]);
- context.write(NullWritable.get(), record);
- }
+ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ String[] cols = value.toString().split(",");
+ DefaultHCatRecord record = new DefaultHCatRecord(3);
+ record.set(0, Integer.parseInt(cols[0]));
+ record.set(1, cols[1]);
+ record.set(2, cols[2]);
+ context.write(NullWritable.get(), record);
+ }
}
- private HCatSchema getSchema() throws HCatException {
- HCatSchema schema = new HCatSchema(new ArrayList());
- schema.append(new HCatFieldSchema("a0", HCatFieldSchema.Type.INT,
- ""));
- schema.append(new HCatFieldSchema("a1",
- HCatFieldSchema.Type.STRING, ""));
- schema.append(new HCatFieldSchema("a2",
- HCatFieldSchema.Type.STRING, ""));
- return schema;
- }
+ private HCatSchema getSchema() throws HCatException {
+ HCatSchema schema = new HCatSchema(new ArrayList());
+ schema.append(new HCatFieldSchema("a0", HCatFieldSchema.Type.INT,
+ ""));
+ schema.append(new HCatFieldSchema("a1",
+ HCatFieldSchema.Type.STRING, ""));
+ schema.append(new HCatFieldSchema("a2",
+ HCatFieldSchema.Type.STRING, ""));
+ return schema;
+ }
}
Index: src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java
===================================================================
--- src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java (revision 1381792)
+++ src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java (working copy)
@@ -36,309 +36,309 @@
public class TestHCatPartitioned extends HCatMapReduceTest {
- private List writeRecords;
- private List partitionColumns;
+ private List writeRecords;
+ private List partitionColumns;
- @Override
- protected void initialize() throws Exception {
+ @Override
+ protected void initialize() throws Exception {
- tableName = "testHCatPartitionedTable";
- writeRecords = new ArrayList();
+ tableName = "testHCatPartitionedTable";
+ writeRecords = new ArrayList();
- for(int i = 0;i < 20;i++) {
- List