diff --git core/src/main/scala/kafka/producer/SyncProducerConfig.scala core/src/main/scala/kafka/producer/SyncProducerConfig.scala
index c3a5d12..3e307ec 100644
--- core/src/main/scala/kafka/producer/SyncProducerConfig.scala
+++ core/src/main/scala/kafka/producer/SyncProducerConfig.scala
@@ -46,21 +46,21 @@ trait SyncProducerConfigShared {
   val maxMessageSize = Utils.getInt(props, "max.message.size", 1000000)
 
   /* the client application sending the producer requests */
-  val correlationId = Utils.getInt(props,"producer.request.correlation_id",-1)
+  val correlationId = Utils.getInt(props,"producer.request.correlation_id", SyncProducerConfig.DefaultCorrelationId)
 
   /* the client application sending the producer requests */
-  val clientId = Utils.getString(props,"producer.request.client_id","")
+  val clientId = Utils.getString(props,"producer.request.client_id",SyncProducerConfig.DefaultClientId)
 
   /* the required acks of the producer requests */
-  val requiredAcks = Utils.getShort(props,"producer.request.required.acks",0)
+  val requiredAcks = Utils.getShort(props,"producer.request.required.acks", SyncProducerConfig.DefaultRequiredAcks)
 
   /* the ack timeout of the producer requests */
-  val ackTimeoutMs = Utils.getInt(props,"producer.request.ack.timeout.ms", -1)
+  val ackTimeoutMs = Utils.getInt(props,"producer.request.ack.timeout.ms", SyncProducerConfig.DefaultAckTimeoutMs)
 }
 
 object SyncProducerConfig {
   val DefaultCorrelationId = -1
   val DefaultClientId = ""
   val DefaultRequiredAcks : Short = 0
-  val DefaultAckTimeoutMs = 1
+  val DefaultAckTimeoutMs = -1
 }
\ No newline at end of file
diff --git core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index ceaf3f7..dc44e75 100644
--- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -44,7 +44,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
     lock synchronized {
       val serializedData = serialize(events)
       var outstandingProduceRequests = serializedData
-      var remainingRetries = config.producerRetries
+      var remainingRetries = config.producerRetries + 1
       while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {
         outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)
         if (outstandingProduceRequests.size > 0)  {
diff --git core/src/main/scala/kafka/server/KafkaApis.scala core/src/main/scala/kafka/server/KafkaApis.scala
index b95242b..468bf35 100644
--- core/src/main/scala/kafka/server/KafkaApis.scala
+++ core/src/main/scala/kafka/server/KafkaApis.scala
@@ -513,8 +513,8 @@ class KafkaApis(val requestChannel: RequestChannel,
               trace("Received %d/%d acks for produce request to %s-%d".format(
                 numAcks, produce.requiredAcks,
                 topic, partitionId))
-              if (numAcks >= produce.requiredAcks ||
-                (numAcks >= isr.size && produce.requiredAcks < 0)) {
+              if ((produce.requiredAcks < 0 && numAcks >= isr.size) ||
+                  (produce.requiredAcks > 0 && numAcks >= produce.requiredAcks)) {
                 fetchPartitionStatus.acksPending = false
                 fetchPartitionStatus.error = ErrorMapping.NoError
                 val topicData =
