Description
In kafka.log.append(), we convert all IOException to KafkaStorageException. This will cause the caller to shutdown the broker. However, if there is a corrupted compressed message, validMessages.assignOffsets() in append() could also throw an IOException when decompressing the message. In this case, we shouldn't shut down the broker and should just fail this particular produce request.