diff --git pom.xml pom.xml
index 41f5337..f5fba08 100644
--- pom.xml
+++ pom.xml
@@ -41,6 +41,7 @@
ql
serde
service
+ streaming
shims
testutils
packaging
diff --git ql/src/java/org/apache/hadoop/hive/ql/hooks/EnforceReadOnlyTables.java ql/src/java/org/apache/hadoop/hive/ql/hooks/EnforceReadOnlyTables.java
index 1458075..8dfe24f 100644
--- ql/src/java/org/apache/hadoop/hive/ql/hooks/EnforceReadOnlyTables.java
+++ ql/src/java/org/apache/hadoop/hive/ql/hooks/EnforceReadOnlyTables.java
@@ -56,8 +56,7 @@ public void run(HookContext hookContext) throws Exception {
this.run(ss,inputs,outputs,ugi);
}
- public void run(SessionState sess, Set inputs,
- Set outputs, UserGroupInformation ugi)
+ public void run(SessionState sess, Set inputs, Set outputs, UserGroupInformation ugi)
throws Exception {
if (sess.getConf().getBoolean("hive.test.init.phase", false) == true) {
return;
diff --git streaming/pom.xml streaming/pom.xml
new file mode 100644
index 0000000..eae34b4
--- /dev/null
+++ streaming/pom.xml
@@ -0,0 +1,100 @@
+
+
+
+ 4.0.0
+
+ org.apache.hive
+ hive
+ 0.13.0-SNAPSHOT
+ ../pom.xml
+
+
+ hive-streaming
+ jar
+ Hive Streaming
+
+
+ ..
+
+
+
+
+
+
+ org.apache.hive
+ hive-serde
+ ${project.version}
+
+
+ org.apache.hive
+ hive-metastore
+ ${project.version}
+
+
+ org.apache.hive
+ hive-metastore
+ ${project.version}
+
+
+ org.apache.hive
+ hive-exec
+ ${project.version}
+
+
+ org.apache.hive
+ hive-cli
+ ${project.version}
+
+
+
+ org.apache.hadoop
+ hadoop-core
+ ${hadoop-20S.version}
+
+
+
+
+ junit
+ junit
+ ${junit.version}
+ test
+
+
+
+
+
+ ${basedir}/src/java
+ ${basedir}/src/test
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ test-jar
+
+
+
+
+
+
+
+
diff --git streaming/src/java/org/apache/hive/streaming/AbstractLazySimpleRecordWriter.java streaming/src/java/org/apache/hive/streaming/AbstractLazySimpleRecordWriter.java
new file mode 100644
index 0000000..dd773d9
--- /dev/null
+++ streaming/src/java/org/apache/hive/streaming/AbstractLazySimpleRecordWriter.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+/**
+ * Uses LazySimple serde to enode the record
+ */
+public abstract class AbstractLazySimpleRecordWriter implements RecordWriter {
+
+ private final Path partitionPath;
+ private final int totalBuckets;
+
+ private OrcOutputFormat outf = new OrcOutputFormat();
+ private HiveConf conf = new HiveConf();
+ private RecordUpdater updater = null;
+ private final ArrayList tableColumns;
+ private final static String serdeClassName = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
+ private LazySimpleSerDe serde;
+ private ObjectInspector inspector;
+ private Random rand = new Random();
+ private int currentBucketId = 0;
+
+ public final static char outputFieldSeparator
+ = (char)LazySimpleSerDe.DefaultSeparators[0];
+
+
+ protected AbstractLazySimpleRecordWriter(HiveEndPoint endPoint)
+ throws ConnectionError, StreamingException, ClassNotFoundException {
+ try {
+ if(endPoint.metaStoreUri!=null) {
+ conf.setVar(HiveConf.ConfVars.METASTOREURIS, endPoint.metaStoreUri);
+ }
+
+ Hive hive = Hive.get(conf, false);
+
+ Table tbl = hive.getTable(endPoint.database, endPoint.table);
+ Properties tableProps = MetaStoreUtils.getTableMetadata(tbl.getTTable());
+
+ this.serde = createSerde(tableProps, conf);
+ this.inspector = this.serde.getObjectInspector();
+ this.tableColumns = getPartitionCols(tbl);
+ this.partitionPath = getPathForPartition(hive, endPoint.database,
+ endPoint.table, endPoint.partitionVals);
+ this.totalBuckets = hive.getTable(endPoint.database, endPoint.table)
+ .getNumBuckets();
+
+ } catch (HiveException e) {
+ throw new ConnectionError("Problem connecting to Hive", e);
+ } catch (SerDeException e) {
+ throw new StreamingException("Failed to create serde " + serdeClassName, e);
+ }
+
+ }
+
+ private RecordUpdater createRecordUpdater(int bucketId, Long minTxnId, Long maxTxnID)
+ throws StreamingException {
+ try {
+ return outf.getRecordUpdater(partitionPath,
+ new AcidOutputFormat.Options(conf)
+ .inspector(inspector)
+ .bucket(bucketId)
+ .minimumTransactionId(minTxnId)
+ .maximumTransactionId(maxTxnID)
+ .useDummy(System.out) );
+ } catch (IOException e) {
+ throw new StreamingException("Failed to get record updater", e);
+ }
+ }
+
+ protected abstract byte[] reorderFields(byte[] record)
+ throws UnsupportedEncodingException;
+
+ public ArrayList getTableColumns() {
+ return tableColumns;
+ }
+
+ @Override
+ public void write(long transactionId, byte[] record)
+ throws SerializationError, StreamingIOFailure {
+ try {
+ byte[] orderedFields = reorderFields(record);
+ Object encodedRow = encode(orderedFields);
+ updater.insert(transactionId, currentBucketId, encodedRow);
+ } catch (IOException e) {
+ throw new StreamingIOFailure("Error writing record in transaction("
+ + transactionId + ")", e);
+ }
+ }
+
+ @Override
+ public void flush() throws StreamingIOFailure {
+ try {
+ updater.flush();
+ } catch (IOException e) {
+ throw new StreamingIOFailure("Unable to flush recordUpdater", e);
+ }
+ }
+
+
+ @Override
+ public void newBatch(Long minTxnId, Long maxTxnID) throws StreamingException {
+ this.currentBucketId = rand.nextInt(totalBuckets);
+ updater = createRecordUpdater(currentBucketId, minTxnId, maxTxnID);
+ }
+
+ @Override
+ public void closeBatch() throws StreamingIOFailure {
+ try {
+ updater.close(false);
+ updater = null;
+ } catch (IOException e) {
+ throw new StreamingIOFailure("Unable to close recordUpdater", e);
+ }
+ }
+
+ private Object encode(byte[] record) throws SerializationError {
+ try {
+ BytesWritable blob = new BytesWritable();
+ blob.set(record, 0, record.length);
+ return serde.deserialize(blob);
+ } catch (SerDeException e) {
+ throw new SerializationError("Unable to convert byte[] record into Object", e);
+ }
+ }
+
+ private static LazySimpleSerDe createSerde(Properties tableProps, HiveConf conf)
+ throws ClassNotFoundException, SerializationError {
+ try {
+ Class> serdeClass = Class.forName(serdeClassName);
+ LazySimpleSerDe serde =
+ (LazySimpleSerDe) ReflectionUtils.newInstance(serdeClass, conf);
+ serde.initialize(conf, tableProps);
+ return serde;
+ } catch (SerDeException e) {
+ throw new SerializationError("Error initializing serde", e);
+ }
+ }
+
+ private Path getPathForPartition(Hive hive, String database, String table,
+ List partitionVals)
+ throws StreamingException {
+ try {
+ IMetaStoreClient msClient = hive.getMSC();
+ String location =
+ msClient.getPartition(database,table,partitionVals).getSd().getLocation();
+ msClient.close();
+ return new Path(location);
+ } catch (TException e) {
+ throw new StreamingException(e.getMessage()
+ + ". Unable to get partitionPath for specified partition: "
+ + partitionVals, e);
+ }
+ }
+
+ private ArrayList getPartitionCols(Table table) {
+ List cols = table.getCols();
+ ArrayList colNames = new ArrayList(cols.size());
+ for(FieldSchema col : cols) {
+ colNames.add(col.getName().toLowerCase());
+ }
+ return colNames;
+ }
+
+}
diff --git streaming/src/java/org/apache/hive/streaming/ConnectionError.java streaming/src/java/org/apache/hive/streaming/ConnectionError.java
new file mode 100644
index 0000000..e0ff1da
--- /dev/null
+++ streaming/src/java/org/apache/hive/streaming/ConnectionError.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class ConnectionError extends StreamingException {
+
+ public ConnectionError(String msg, Exception innerEx) {
+ super(msg, innerEx);
+ }
+ public ConnectionError(Exception e) {
+ super(e.getMessage(), e);
+ }
+}
diff --git streaming/src/java/org/apache/hive/streaming/DelimitedInputWriter.java streaming/src/java/org/apache/hive/streaming/DelimitedInputWriter.java
new file mode 100644
index 0000000..1ce3086
--- /dev/null
+++ streaming/src/java/org/apache/hive/streaming/DelimitedInputWriter.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+
+/**
+ * Streaming Writer handles delimited input (eg. CSV).
+ * Delimited input is parsed & reordered to match column order in table
+ */
+public class DelimitedInputWriter extends AbstractLazySimpleRecordWriter {
+ private String delimiter;
+ private int[] fieldToColMapping;
+
+ public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+ HiveEndPoint endPoint)
+ throws ClassNotFoundException, StreamingException, IOException {
+ super(endPoint);
+ this.delimiter = delimiter;
+
+ this.fieldToColMapping = getFieldReordering(colNamesForFields, getTableColumns());
+ }
+
+ private int[] getFieldReordering(String[] colNamesForFields, List tableColNames)
+ throws InvalidColumn {
+ int[] result = new int[ colNamesForFields.length ];
+ for(int i=0; i partitionVals;
+
+ public HiveEndPoint(String metaStoreUri
+ , String database, String table, List partitionVals) {
+ this.metaStoreUri = metaStoreUri;
+ this.database = database;
+ this.table = table;
+ this.partitionVals = partitionVals;
+ }
+
+ public StreamingConnection newConnection(String user, boolean createPartIfNotExists)
+ throws ConnectionError, InvalidPartition, ClassNotFoundException
+ , StreamingException {
+ if(createPartIfNotExists) {
+ createPartitionIfNotExists();
+ }
+ return new ConnectionImpl(this, user);
+ }
+
+ private boolean createPartitionIfNotExists() throws InvalidTable, StreamingException {
+ HiveConf conf = new HiveConf();
+ IMetaStoreClient msClient = getMetaStoreClient(this, conf);
+
+ try {
+ Partition part = new Partition();
+ try {
+ Table table1 = msClient.getTable(database, table);
+ part.setDbName(database);
+ part.setTableName(table);
+ part.setValues(partitionVals);
+ part.setParameters(new HashMap());
+ part.setSd(table1.getSd());
+ } catch (NoSuchObjectException e) {
+ throw new InvalidTable(database, table);
+ } catch (TException e) {
+ throw new StreamingException("Cannot connect to table DB:"
+ + database + ", Table: " + table, e);
+ }
+
+ try {
+ msClient.add_partition(part);
+ return true;
+ } catch (AlreadyExistsException e) {
+ return false;
+ } catch (TException e) {
+ throw new StreamingException("Partition creation failed");
+ }
+ } finally {
+ msClient.close();
+ }
+ }
+
+
+ // uses embedded store if endpoint.metastoreUri is null
+ private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveConf conf)
+ throws ConnectionError {
+ if(endPoint.metaStoreUri== null) {
+ TxnDbUtil.setConfValues(conf);
+ } else {
+ conf.setVar(HiveConf.ConfVars.METASTOREURIS, endPoint.metaStoreUri);
+ }
+
+ try {
+ return Hive.get(conf).getMSC();
+ } catch (MetaException e) {
+ throw new ConnectionError("Error connecting to Hive Metastore URI: "
+ + endPoint.metaStoreUri, e);
+ } catch (HiveException e) {
+ throw new ConnectionError("Error connecting to Hive Metastore URI: "
+ + endPoint.metaStoreUri, e);
+ }
+ }
+
+
+
+ private static class ConnectionImpl implements StreamingConnection {
+ private final HiveConf conf;
+ private final IMetaStoreClient msClient;
+ private final LockRequest lockRequest;
+ private String user;
+ private Random rand = new Random(); // used for generating bucketIDs
+
+ private ConnectionImpl(HiveEndPoint endPoint, String user)
+ throws ConnectionError, InvalidPartition, ClassNotFoundException {
+ this.conf = new HiveConf();
+ this.msClient = getMetaStoreClient(endPoint, conf);
+ this.user = user;
+ this.lockRequest = createLockRequest(endPoint);
+ }
+
+ public void close() {
+ msClient.close();
+ }
+
+ @Override
+ public TransactionBatch fetchTransactionBatch(int numTransactions,
+ RecordWriter recordWriter)
+ throws ConnectionError, InvalidPartition, StreamingException {
+ return new TransactionBatchImpl(user, numTransactions, msClient,
+ lockRequest, recordWriter);
+ }
+
+ private static LockRequest createLockRequest(HiveEndPoint hiveEndPoint)
+ throws InvalidPartition {
+ LockRequestBuilder rqstBuilder = new LockRequestBuilder();
+
+ for( String partition : hiveEndPoint.partitionVals ) {
+ rqstBuilder.addLockComponent(new LockComponentBuilder()
+ .setDbName(hiveEndPoint.database)
+ .setTableName(hiveEndPoint.table)
+ .setPartitionName(partition)
+ .setShared()
+ .build());
+ }
+ return rqstBuilder.build();
+ }
+ } // class ConnectionImpl
+
+ private static class TransactionBatchImpl implements TransactionBatch {
+ private final List txnIds;
+ private int currentTxnIndex;
+ private final IMetaStoreClient msClient;
+ private final RecordWriter recordWriter;
+
+ private TxnState state;
+ private final LockRequest lockRequest;
+
+ private TransactionBatchImpl(String user, int numTxns, IMetaStoreClient msClient,
+ LockRequest lockRequest,
+ RecordWriter recordWriter)
+ throws StreamingException {
+ try {
+ this.msClient = msClient;
+
+ this.lockRequest = lockRequest;
+ this.recordWriter = recordWriter;
+ this.txnIds = msClient.openTxns(user, numTxns).getTxn_ids();
+ this.currentTxnIndex = -1;
+ this.state = TxnState.INACTIVE;
+ recordWriter.newBatch(txnIds.get(0), txnIds.get(txnIds.size()-1));
+ } catch (TException e) {
+ throw new ConnectionError("Unable to fetch new transaction batch", e);
+ }
+ }
+
+ public void beginNextTransaction() throws StreamingException {
+ if(currentTxnIndex >= txnIds.size())
+ throw new InvalidTrasactionState("No more transactions available in" +
+ " current batch");
+ ++currentTxnIndex;
+
+ try {
+ LockResponse res = msClient.lock(lockRequest);
+ if(res.getState() != LockState.ACQUIRED) {
+ throw new StreamingException("Unable to acquire partition lock");
+ }
+ } catch (TException e) {
+ throw new StreamingException("Unable to acquire partition lock", e);
+ }
+
+ state = TxnState.OPEN;
+ }
+
+ public Long getCurrentTxnId() {
+ return txnIds.get(currentTxnIndex);
+ }
+
+ public TxnState getCurrentTransactionState() {
+ return state;
+ }
+
+ // active txn is not considered part of remaining txn
+ public int remainingTransactions() {
+ return txnIds.size() - currentTxnIndex + 1;
+ }
+
+
+ /** Write using RecordWriter */
+ @Override
+ public void write(byte[] record)
+ throws ConnectionError, IOException, StreamingException {
+ recordWriter.write(getCurrentTxnId(), record);
+ }
+
+ public void write(Collection records)
+ throws ConnectionError, IOException, StreamingException {
+ for(byte[] record : records) {
+ write(record);
+ }
+ }
+
+
+ @Override
+ public void commit() throws StreamingException {
+ try {
+ recordWriter.flush();
+ msClient.commitTxn(txnIds.get(currentTxnIndex));
+ state = TxnState.COMMITTED;
+ } catch (NoSuchTxnException e) {
+ throw new InvalidTrasactionState("Invalid transaction id : "
+ + getCurrentTxnId(), e);
+ } catch (TxnAbortedException e) {
+ throw new InvalidTrasactionState("Aborted transaction cannot be committed"
+ , e);
+ } catch (TException e) {
+ throw new StreamingException("Unable to commit transaction"
+ + getCurrentTxnId(), e);
+ }
+ }
+
+ @Override
+ public void abort() throws StreamingException {
+ try {
+ msClient.rollbackTxn(getCurrentTxnId());
+ state = TxnState.ABORTED;
+ } catch (NoSuchTxnException e) {
+ throw new InvalidTrasactionState("Invalid transaction id : "
+ + getCurrentTxnId(), e);
+ } catch (TException e) {
+ throw new StreamingException("Unable to abort transaction id : "
+ + getCurrentTxnId(), e);
+ }
+ }
+
+ @Override
+ public void close() throws StreamingException {
+ state = TxnState.INACTIVE;
+ recordWriter.closeBatch();
+ }
+ } // class TransactionBatchImpl
+
+} // class HiveEndPoint
+
+// Racing to create new partition
diff --git streaming/src/java/org/apache/hive/streaming/InvalidColumn.java streaming/src/java/org/apache/hive/streaming/InvalidColumn.java
new file mode 100644
index 0000000..08d8fdf
--- /dev/null
+++ streaming/src/java/org/apache/hive/streaming/InvalidColumn.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class InvalidColumn extends StreamingException {
+
+ public InvalidColumn(String msg) {
+ super(msg);
+ }
+}
diff --git streaming/src/java/org/apache/hive/streaming/InvalidPartition.java streaming/src/java/org/apache/hive/streaming/InvalidPartition.java
new file mode 100644
index 0000000..94f6b6e
--- /dev/null
+++ streaming/src/java/org/apache/hive/streaming/InvalidPartition.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class InvalidPartition extends StreamingException {
+
+ public InvalidPartition(String partitionName, String partitionValue) {
+ super("Invalid partition: Name=" + partitionName +
+ ", Value=" + partitionValue);
+ }
+
+}
diff --git streaming/src/java/org/apache/hive/streaming/InvalidTable.java streaming/src/java/org/apache/hive/streaming/InvalidTable.java
new file mode 100644
index 0000000..c92fe2a
--- /dev/null
+++ streaming/src/java/org/apache/hive/streaming/InvalidTable.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class InvalidTable extends ConnectionError {
+
+ private static String makeMsg(String db, String table) {
+ return "Invalid table db:" + db + ", table:" + table;
+ }
+
+ public InvalidTable(String db, String table, Exception cause) {
+ super(makeMsg(db,table), cause);
+ }
+
+ public InvalidTable(String db, String table) {
+ super(makeMsg(db,table), null);
+ }
+}
+
diff --git streaming/src/java/org/apache/hive/streaming/InvalidTrasactionState.java streaming/src/java/org/apache/hive/streaming/InvalidTrasactionState.java
new file mode 100644
index 0000000..80334c5
--- /dev/null
+++ streaming/src/java/org/apache/hive/streaming/InvalidTrasactionState.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class InvalidTrasactionState extends RuntimeException {
+ public InvalidTrasactionState(String msg) {
+ super(msg);
+ }
+
+ public InvalidTrasactionState(String msg, Exception e) {
+ super(msg,e);
+ }
+}
diff --git streaming/src/java/org/apache/hive/streaming/RecordWriter.java streaming/src/java/org/apache/hive/streaming/RecordWriter.java
new file mode 100644
index 0000000..e70fec9
--- /dev/null
+++ streaming/src/java/org/apache/hive/streaming/RecordWriter.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public interface RecordWriter {
+
+ /** Write using RecordUpdater */
+ public void write(long transactionId, byte[] record) throws StreamingException;
+ /** flush records */
+ public void flush() throws StreamingException;
+
+ /** acquire a new RecordUpdater */
+ public void newBatch(Long minTxnId, Long maxTxnID) throws StreamingException;
+ /** close the RecordUpdater */
+ public void closeBatch() throws StreamingException;
+}
+
diff --git streaming/src/java/org/apache/hive/streaming/SerializationError.java streaming/src/java/org/apache/hive/streaming/SerializationError.java
new file mode 100644
index 0000000..6844aca
--- /dev/null
+++ streaming/src/java/org/apache/hive/streaming/SerializationError.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+
+public class SerializationError extends StreamingException {
+ public SerializationError(String msg, Exception e) {
+ super(msg,e);
+ }
+}
diff --git streaming/src/java/org/apache/hive/streaming/StreamingConnection.java streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
new file mode 100644
index 0000000..5146a5a
--- /dev/null
+++ streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+
+public interface StreamingConnection {
+
+ /**
+ * Acquire a set of transactions
+ * @param numTransactionsHint a hint about the number of desired transactions
+ * @return a batch of transactions
+ */
+ public TransactionBatch fetchTransactionBatch(int numTransactionsHint,
+ RecordWriter writer)
+ throws ConnectionError, StreamingException;
+
+ /**
+ * Close connection
+ */
+ public void close();
+
+}
diff --git streaming/src/java/org/apache/hive/streaming/StreamingException.java streaming/src/java/org/apache/hive/streaming/StreamingException.java
new file mode 100644
index 0000000..9486d6d
--- /dev/null
+++ streaming/src/java/org/apache/hive/streaming/StreamingException.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class StreamingException extends Exception {
+ public StreamingException(String msg, Exception cause) {
+ super(msg, cause);
+ }
+ public StreamingException(String msg) {
+ super(msg);
+ }
+}
diff --git streaming/src/java/org/apache/hive/streaming/StreamingIOFailure.java streaming/src/java/org/apache/hive/streaming/StreamingIOFailure.java
new file mode 100644
index 0000000..fb09488
--- /dev/null
+++ streaming/src/java/org/apache/hive/streaming/StreamingIOFailure.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+
+public class StreamingIOFailure extends StreamingException {
+
+ public StreamingIOFailure(String msg, Exception cause) {
+ super(msg, cause);
+ }
+
+ public StreamingIOFailure(String msg) {
+ super(msg);
+ }
+}
diff --git streaming/src/java/org/apache/hive/streaming/TransactionBatch.java streaming/src/java/org/apache/hive/streaming/TransactionBatch.java
new file mode 100644
index 0000000..cf96bef
--- /dev/null
+++ streaming/src/java/org/apache/hive/streaming/TransactionBatch.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+
+public interface TransactionBatch {
+ public enum TxnState {INACTIVE, OPEN, COMMITTED, ABORTED }
+
+ public void beginNextTransaction() throws StreamingException;
+
+ public Long getCurrentTxnId();
+
+ public TxnState getCurrentTransactionState();
+ public void commit() throws StreamingException;
+ public void abort() throws StreamingException;
+
+ // active txn is not considered part of remaining txn //
+ public int remainingTransactions();
+
+
+ // Write Data for current Txn //
+ public void write(byte[] record) throws ConnectionError, IOException, StreamingException;
+ public void write(Collection records) throws ConnectionError, IOException, StreamingException;
+
+ public void close() throws StreamingException;
+}
diff --git streaming/src/test/org/apache/hive/streaming/TestStreaming.java streaming/src/test/org/apache/hive/streaming/TestStreaming.java
new file mode 100644
index 0000000..5a50903
--- /dev/null
+++ streaming/src/test/org/apache/hive/streaming/TestStreaming.java
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import junit.framework.Assert;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class TestStreaming {
+
+ private static final String COL1 = "id";
+ private static final String COL2 = "msg";
+
+ private final IMetaStoreClient msClient;
+ private final Hive hive;
+
+ final static String metaStoreURI = null;
+
+ final static String dbName = "testing";
+ final static String tblName = "alerts";
+ final String[] fieldNames;
+
+ List partitionVals;
+ private String user = "roshan";
+
+ public TestStreaming() throws MetaException, HiveException {
+ hive = Hive.get();
+ msClient = hive.getMSC();
+
+ partitionVals = new ArrayList(2);
+ partitionVals.add("Asia");
+ partitionVals.add("India");
+
+ fieldNames = new String[]{COL1,COL2};
+ }
+
+
+ private static List getPartitionKeys() {
+ List fields = new ArrayList();
+ //Defining partition names in unsorted order
+ fields.add(new FieldSchema("continent", serdeConstants.STRING_TYPE_NAME, ""));
+ fields.add(new FieldSchema("country", serdeConstants.STRING_TYPE_NAME, ""));
+ return fields;
+ }
+
+
+ @Before
+ public void setup() throws Exception {
+ TxnDbUtil.cleanDb();
+ TxnDbUtil.prepDb();
+
+ Hive hive = Hive.get();
+ IMetaStoreClient msClient = hive.getMSC();
+ dropDB(msClient, dbName);
+ createDbAndTable(msClient, dbName, tblName, partitionVals);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ dropDB(msClient,dbName);
+ TxnDbUtil.cleanDb();
+ }
+
+ @Test
+ public void testEndpointConnection() throws Exception {
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName
+ , partitionVals);
+ StreamingConnection connection = endPt.newConnection(user, false); //shouldn't throw
+ connection.close();
+ }
+
+ @Test
+ public void testAddPartition() throws Exception {
+ List newPartVals = new ArrayList(2);
+ newPartVals.add("Asia");
+ newPartVals.add("Nepal");
+
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName
+ , newPartVals);
+
+ // Ensure partition is absent
+ try {
+ msClient.getPartition(endPt.database, endPt.table, endPt.partitionVals);
+ Assert.assertTrue("Partition already exists", false);
+ } catch (NoSuchObjectException e) {
+ // expect this exception
+ }
+
+ // Create partition
+ Assert.assertNotNull( endPt.newConnection(user,true) );
+
+ // Ensure partition is present
+ Partition p = msClient.getPartition(endPt.database, endPt.table, endPt.partitionVals);
+ Assert.assertNotNull("Did not find added partition", p);
+ }
+
+ @Test
+ public void testTransactionBatchEmptyCommit() throws Exception {
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+ partitionVals);
+ DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
+ StreamingConnection connection = endPt.newConnection(user, false);
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.commit();
+ Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.close();
+ connection.close();
+ }
+
+ @Test
+ public void testTransactionBatchEmptyAbort() throws Exception {
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+ partitionVals);
+ DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
+ StreamingConnection connection = endPt.newConnection(user, true);
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.abort();
+ Assert.assertEquals(TransactionBatch.TxnState.ABORTED
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.close();
+ connection.close();
+ }
+
+ @Test
+ public void testTransactionBatchCommit() throws Exception {
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+ partitionVals);
+ DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
+ StreamingConnection connection = endPt.newConnection(user, true);
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ Assert.assertEquals(TransactionBatch.TxnState.OPEN
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.write("1,Hello streaming".getBytes());
+ txnBatch.commit();
+ Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+ , txnBatch.getCurrentTransactionState());
+
+ txnBatch.beginNextTransaction();
+ Assert.assertEquals(TransactionBatch.TxnState.OPEN
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.write("2,Welcome to streaming".getBytes());
+ txnBatch.commit();
+ Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.close();
+ connection.close();
+ }
+
+ @Test
+ public void testTransactionBatchAbort() throws Exception {
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+ partitionVals);
+ StreamingConnection connection = endPt.newConnection(user, false);
+ DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.write("1,Hello streaming".getBytes());
+ txnBatch.write("2,Welcome to streaming".getBytes());
+ txnBatch.abort();
+ Assert.assertEquals(TransactionBatch.TxnState.ABORTED
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.close();
+ connection.close();
+ }
+
+ @Test
+ public void testMultipleTransactionBatchCommits() throws Exception {
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+ partitionVals);
+ DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
+ StreamingConnection connection = endPt.newConnection(user, false);
+
+ // 1st Txn Batch
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.write("1,Hello streaming".getBytes());
+ txnBatch.commit();
+ txnBatch.beginNextTransaction();
+ txnBatch.write("2,Welcome to streaming".getBytes());
+ txnBatch.commit();
+ txnBatch.close();
+
+ // 2nd Txn Batch
+ txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.write("3,Hello streaming - once again".getBytes());
+ txnBatch.commit();
+ txnBatch.beginNextTransaction();
+ txnBatch.write("4,Welcome to streaming - once again".getBytes());
+ txnBatch.commit();
+ Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+ , txnBatch.getCurrentTransactionState());
+
+ txnBatch.close();
+
+ connection.close();
+ }
+
+
+ // delete db and all tables in it
+ public static void dropDB(IMetaStoreClient client, String databaseName) {
+ try {
+ for(String table : client.listTableNamesByFilter(databaseName, "", (short)-1)) {
+ client.dropTable(databaseName, table, true, true);
+ }
+ client.dropDatabase(databaseName);
+ } catch (TException e) {
+ }
+
+ }
+
+ public static void createDbAndTable(IMetaStoreClient client, String databaseName,
+ String tableName, List partVals)
+ throws Exception {
+ Database db = new Database();
+ db.setName(databaseName);
+ client.createDatabase(db);
+
+ Table tbl = new Table();
+ tbl.setDbName(databaseName);
+ tbl.setTableName(tableName);
+ tbl.setTableType(TableType.MANAGED_TABLE.toString());
+ StorageDescriptor sd = new StorageDescriptor();
+ sd.setCols(getTableColumns());
+ sd.setNumBuckets(10);
+ tbl.setPartitionKeys(getPartitionKeys());
+
+ tbl.setSd(sd);
+
+ sd.setBucketCols(new ArrayList(2));
+ sd.setSerdeInfo(new SerDeInfo());
+ sd.getSerdeInfo().setName(tbl.getTableName());
+ sd.getSerdeInfo().setParameters(new HashMap());
+ sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1");
+
+ sd.getSerdeInfo().setSerializationLib(LazySimpleSerDe.class.getName());
+ sd.setInputFormat(inputFormat());
+ sd.setOutputFormat(outputFormat());
+
+ Map tableParams = new HashMap();
+ tbl.setParameters(tableParams);
+ client.createTable(tbl);
+
+ addPartition(client, tbl, partVals);
+ }
+
+
+ private static void addPartition(IMetaStoreClient client, Table tbl
+ , List partValues)
+ throws IOException, TException {
+ Partition part = new Partition();
+ part.setDbName(dbName);
+ part.setTableName(tblName);
+ part.setSd(tbl.getSd());
+ part.setValues(partValues);
+ client.add_partition(part);
+ }
+
+ protected static String inputFormat() {
+ return RCFileInputFormat.class.getName();
+ }
+
+ protected static String outputFormat() {
+ return RCFileOutputFormat.class.getName();
+ }
+
+ private static List getTableColumns() {
+ List fields = new ArrayList();
+ fields.add(new FieldSchema(COL1, serdeConstants.INT_TYPE_NAME, ""));
+ fields.add(new FieldSchema(COL2, serdeConstants.STRING_TYPE_NAME, ""));
+ return fields;
+ }
+}