diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java index 5cf008e..ce6da89 100644 --- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java +++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java @@ -22,10 +22,14 @@ import org.apache.accumulo.core.client.mapred.AccumuloOutputFormat; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.data.Mutation; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.accumulo.AccumuloConnectionParameters; import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.util.Progressable; import com.google.common.base.Preconditions; @@ -41,6 +45,13 @@ public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException super.checkOutputSpecs(ignored, job); } + @Override + public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { + configureAccumuloOutputFormat(job); + + return super.getRecordWriter(ignored, job, name, progress); + } + protected void configureAccumuloOutputFormat(JobConf job) throws IOException { AccumuloConnectionParameters cnxnParams = new AccumuloConnectionParameters(job); @@ -76,16 +87,32 @@ protected void configureAccumuloOutputFormat(JobConf job) throws IOException { protected void setAccumuloConnectorInfo(JobConf conf, String username, AuthenticationToken token) throws AccumuloSecurityException { - AccumuloOutputFormat.setConnectorInfo(conf, username, token); + try { + AccumuloOutputFormat.setConnectorInfo(conf, username, token); + } catch (IllegalStateException e) { + // AccumuloOutputFormat complains if you re-set an already set value. We just don't care. + log.debug("Ignoring exception setting Accumulo Connector instance for user " + username, e); + } } @SuppressWarnings("deprecation") protected void setAccumuloZooKeeperInstance(JobConf conf, String instanceName, String zookeepers) { - AccumuloOutputFormat.setZooKeeperInstance(conf, instanceName, zookeepers); + try { + AccumuloOutputFormat.setZooKeeperInstance(conf, instanceName, zookeepers); + } catch (IllegalStateException ise) { + // AccumuloOutputFormat complains if you re-set an already set value. We just don't care. + log.debug("Ignoring exception setting ZooKeeper instance of " + instanceName + " at " + + zookeepers, ise); + } } protected void setAccumuloMockInstance(JobConf conf, String instanceName) { - AccumuloOutputFormat.setMockInstance(conf, instanceName); + try { + AccumuloOutputFormat.setMockInstance(conf, instanceName); + } catch (IllegalStateException e) { + // AccumuloOutputFormat complains if you re-set an already set value. We just don't care. + log.debug("Ignoring exception setting mock instance of " + instanceName, e); + } } protected void setDefaultAccumuloTableName(JobConf conf, String tableName) { diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloRowSerializer.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloRowSerializer.java index d168012..46c3c1a 100644 --- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloRowSerializer.java +++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloRowSerializer.java @@ -99,9 +99,6 @@ public Mutation serialize(Object obj, ObjectInspector objInspector) throws SerDe // The ObjectInspector for the row ID ObjectInspector fieldObjectInspector = field.getFieldObjectInspector(); - log.info("Serializing rowId with " + value + " in " + field + " using " - + rowIdFactory.getClass()); - // Serialize the row component using the RowIdFactory. In the normal case, this will just // delegate back to the "local" serializeRowId method byte[] data = rowIdFactory.serializeRowId(value, field, output);