Index: core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala	(revision 1374376)
+++ core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala	(working copy)
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.integration
-
-import junit.framework.Assert._
-import java.util.Properties
-
-import kafka.api.{FetchRequestBuilder, OffsetRequest}
-import kafka.consumer.SimpleConsumer
-import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
-import org.scalatest.junit.JUnit3Suite
-import kafka.admin.CreateTopicCommand
-
-class BackwardsCompatibilityTest extends JUnit3Suite with KafkaServerTestHarness {
-
-  val topic = "MagicByte0"
-  val group = "default_group"
-  val testConsumer = "consumer"
-  val kafkaProps = new Properties
-  val host = "localhost"
-  val port = TestUtils.choosePort
-  val loader = getClass.getClassLoader
-  val kafkaLogDir = loader.getResource("test-kafka-logs")
-  kafkaProps.put("brokerid", "12")
-  kafkaProps.put("port", port.toString)
-  kafkaProps.put("log.dir", kafkaLogDir.getPath)
-  kafkaProps.put("zk.connect", zkConnect.toString)
-  val configs = List(new KafkaConfig(kafkaProps))
-  var simpleConsumer: SimpleConsumer = null
-
-  override def setUp() {
-    super.setUp()
-    simpleConsumer = new SimpleConsumer(host, port, 1000000, 64*1024)
-  }
-
-  override def tearDown() {
-    simpleConsumer.close
-    super.tearDown
-  }
-
-  // test for reading data with magic byte 0
-  def testProtocolVersion0() {
-    CreateTopicCommand.createTopic(zkClient, topic, 0, 1, configs.head.brokerId.toString)
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
-    val lastOffset = simpleConsumer.getOffsetsBefore(topic, 0, OffsetRequest.LatestTime, 1)
-    var fetchOffset: Long = 0L
-    var messageCount: Int = 0
-
-    while(fetchOffset < lastOffset(0)) {
-      val fetched = simpleConsumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, fetchOffset, 10000).build())
-      val fetchedMessages = fetched.messageSet(topic, 0)
-      fetchedMessages.foreach(m => fetchOffset = m.offset)
-      messageCount += fetchedMessages.size
-    }
-    assertEquals(100, messageCount)
-  }
-}
Index: core/src/main/scala/kafka/message/Message.scala
===================================================================
--- core/src/main/scala/kafka/message/Message.scala	(revision 1374376)
+++ core/src/main/scala/kafka/message/Message.scala	(working copy)
@@ -25,8 +25,7 @@
  * Message byte offsets
  */
 object Message {
-  val MagicVersion1: Byte = 0
-  val MagicVersion2: Byte = 1
+  val MagicVersion: Byte = 1
   val CurrentMagicValue: Byte = 1
   val MagicOffset = 0
   val MagicLength = 1
@@ -43,13 +42,10 @@
 
   /**
    * Computes the CRC value based on the magic byte
-   * @param magic Specifies the magic byte value. Possible values are 0 and 1
-   *              0 for no compression
-   *              1 for compression
-  */
+   * @param magic Specifies the magic byte value. The only value allowed currently is 1.
+   */
   def crcOffset(magic: Byte): Int = magic match {
-    case MagicVersion1 => MagicOffset + MagicLength
-    case MagicVersion2 => AttributeOffset + AttributeLength
+    case MagicVersion => AttributeOffset + AttributeLength
     case _ => throw new UnknownMagicByteException("Magic byte value of %d is unknown".format(magic))
   }
   
@@ -57,37 +53,25 @@
 
   /**
    * Computes the offset to the message payload based on the magic byte
-   * @param magic Specifies the magic byte value. Possible values are 0 and 1
-   *              0 for no compression
-   *              1 for compression
+   * @param magic Specifies the magic byte value. The only value allowed currently is 1.
    */
   def payloadOffset(magic: Byte): Int = crcOffset(magic) + CrcLength
 
   /**
    * Computes the size of the message header based on the magic byte
-   * @param magic Specifies the magic byte value. Possible values are 0 and 1
-   *              0 for no compression
-   *              1 for compression
+   * @param magic Specifies the magic byte value. The only value allowed currently is 1.
    */
   def headerSize(magic: Byte): Int = payloadOffset(magic)
 
   /**
-   * Size of the header for magic byte 0. This is the minimum size of any message header
+   * Size of the header for magic byte 1. This is the minimum size of any message header.
    */
-  val MinHeaderSize = headerSize(0);
+  val MinHeaderSize = headerSize(1);
 }
 
 /**
  * A message. The format of an N byte message is the following:
  *
- * If magic byte is 0
- *
- * 1. 1 byte "magic" identifier to allow format changes
- *
- * 2. 4 byte CRC32 of the payload
- *
- * 3. N - 5 byte payload
- *
  * If magic byte is 1
  *
  * 1. 1 byte "magic" identifier to allow format changes
