diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index 2ab59d7..bc3262d 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; @@ -728,7 +729,8 @@ private void addWriteNotificationLog(NotificationEvent event, AcidWriteEvent aci LOG.debug("DbNotificationListener: adding write notification log for : {}", event.getMessage()); assert ((dbConn != null) && (sqlGenerator != null)); - Statement stmt =null; + Statement stmt = null; + PreparedStatement pst = null; ResultSet rs = null; String dbName = acidWriteEvent.getDatabase(); String tblName = acidWriteEvent.getTable(); @@ -754,16 +756,25 @@ private void addWriteNotificationLog(NotificationEvent event, AcidWriteEvent aci // if rs is empty then no lock is taken and thus it can not cause deadlock. long nextNLId = getNextNLId(stmt, sqlGenerator, "org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog"); - s = "insert into \"TXN_WRITE_NOTIFICATION_LOG\" (\"WNL_ID\", \"WNL_TXNID\", \"WNL_WRITEID\"," + - " \"WNL_DATABASE\", \"WNL_TABLE\"," + - " \"WNL_PARTITION\", \"WNL_TABLE_OBJ\", \"WNL_PARTITION_OBJ\", \"WNL_FILES\", \"WNL_EVENT_TIME\")" + - " values (" + nextNLId - + "," + acidWriteEvent.getTxnId() + "," + acidWriteEvent.getWriteId()+ "," + - quoteString(dbName)+ "," + quoteString(tblName)+ "," + quoteString(partition)+ "," + - quoteString(tableObj)+ "," + quoteString(partitionObj) + "," + quoteString(files)+ - "," + now() + ")"; - LOG.info("Going to execute insert <" + s + ">"); - stmt.execute(sqlGenerator.addEscapeCharacters(s)); + s = "insert into \"TXN_WRITE_NOTIFICATION_LOG\" " + + "(\"WNL_ID\", \"WNL_TXNID\", \"WNL_WRITEID\", \"WNL_DATABASE\", \"WNL_TABLE\", " + + "\"WNL_PARTITION\", \"WNL_TABLE_OBJ\", \"WNL_PARTITION_OBJ\", " + + "\"WNL_FILES\", \"WNL_EVENT_TIME\") VALUES (?,?,?,?,?,?,?,?,?,?)"; + pst = dbConn.prepareStatement(sqlGenerator.addEscapeCharacters(s)); + pst.setLong(1, nextNLId); + pst.setLong(2, acidWriteEvent.getTxnId()); + pst.setLong(3, acidWriteEvent.getWriteId()); + pst.setString(4, dbName); + pst.setString(5, tblName); + pst.setString(6, partition); + pst.setString(7, tableObj); + pst.setString(8, partitionObj); + pst.setString(9, files); + pst.setInt(10, now()); + LOG.info("Going to execute insert <" + s.replaceAll("\\?", "{}") + ">", nextNLId + , acidWriteEvent.getTxnId(), acidWriteEvent.getWriteId(), quoteString(dbName), quoteString(tblName), + quoteString(partition), quoteString(tableObj), quoteString(partitionObj), quoteString(files), now()); + pst.execute(); } else { String existingFiles = rs.getString(1); if (existingFiles.contains(sqlGenerator.addEscapeCharacters(files))) { @@ -774,19 +785,27 @@ private void addWriteNotificationLog(NotificationEvent event, AcidWriteEvent aci } long nlId = rs.getLong(2); files = ReplChangeManager.joinWithSeparator(Lists.newArrayList(files, existingFiles)); - s = "update \"TXN_WRITE_NOTIFICATION_LOG\" set \"WNL_TABLE_OBJ\" = " + quoteString(tableObj) + "," + - " \"WNL_PARTITION_OBJ\" = " + quoteString(partitionObj) + "," + - " \"WNL_FILES\" = " + quoteString(files) + "," + - " \"WNL_EVENT_TIME\" = " + now() + - " where \"WNL_ID\" = " + nlId; - LOG.info("Going to execute update <" + s + ">"); - stmt.executeUpdate(sqlGenerator.addEscapeCharacters(s)); + s = "update \"TXN_WRITE_NOTIFICATION_LOG\" set \"WNL_TABLE_OBJ\" = ? ," + + " \"WNL_PARTITION_OBJ\" = ? ," + + " \"WNL_FILES\" = ? ," + + " \"WNL_EVENT_TIME\" = ?" + + " where \"WNL_ID\" = ?"; + pst = dbConn.prepareStatement(sqlGenerator.addEscapeCharacters(s)); + pst.setString(1, tableObj); + pst.setString(2, partitionObj); + pst.setString(3, files); + pst.setInt(4, now()); + pst.setLong(5, nlId); + LOG.info("Going to execute update <" + s.replaceAll("\\?", "{}") + ">", quoteString(tableObj), + quoteString(partitionObj), quoteString(files), now(), nlId); + pst.executeUpdate(); } } catch (SQLException e) { LOG.warn("failed to add write notification log" + e.getMessage()); throw e; } finally { closeStmt(stmt); + closeStmt(pst); close(rs); } }