Index: log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/KafkaAppenderTest.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/KafkaAppenderTest.java	(revision )
+++ log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/KafkaAppenderTest.java	(revision )
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+
+package org.apache.logging.log4j.core.appender.mom;
+
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.impl.Log4jLogEvent;
+import org.apache.logging.log4j.junit.LoggerContextRule;
+import org.apache.logging.log4j.message.SimpleMessage;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Properties;
+
+import static org.junit.Assert.*;
+
+public class KafkaAppenderTest {
+
+    private static final String TOPIC_NAME = "kafka-topic";
+    private static final String LOG_MESSAGE = "Hello, world!";
+
+    private static final MockProducer kafka = new MockProducer();
+
+    @BeforeClass
+    public static void setUpClass() throws Exception {
+        KafkaManager.producerFactory = new KafkaProducerFactory() {
+            @Override
+            public Producer<byte[], byte[]> newKafkaProducer(Properties config) {
+                return kafka;
+            }
+        };
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        kafka.clear();
+    }
+
+    @Rule
+    public LoggerContextRule ctx = new LoggerContextRule("KafkaAppenderTest.xml");
+
+    @Test
+    public void testAppend() throws Exception {
+        Appender appender = ctx.getRequiredAppender("KafkaAppender");
+        appender.append(createLogEvent());
+        List<ProducerRecord<byte[], byte[]>> history = kafka.history();
+        assertEquals(1, history.size());
+        ProducerRecord<byte[], byte[]> item = history.get(0);
+        assertNotNull(item);
+        assertEquals(TOPIC_NAME, item.topic());
+        assertNull(item.key());
+        assertEquals(LOG_MESSAGE, new String(item.value(), StandardCharsets.UTF_8));
+    }
+
+    @Test
+    public void testAppendWithLayout() throws Exception {
+        Appender appender = ctx.getRequiredAppender("KafkaAppenderWithLayout");
+        appender.append(createLogEvent());
+        List<ProducerRecord<byte[], byte[]>> history = kafka.history();
+        assertEquals(1, history.size());
+        ProducerRecord<byte[], byte[]> item = history.get(0);
+        assertNotNull(item);
+        assertEquals(TOPIC_NAME, item.topic());
+        assertNull(item.key());
+        assertEquals("[" + LOG_MESSAGE + "]", new String(item.value(), StandardCharsets.UTF_8));
+    }
+
+    private static Log4jLogEvent createLogEvent() {
+        return Log4jLogEvent.newBuilder()
+            .setLoggerName(KafkaAppenderTest.class.getName())
+            .setLoggerFqcn(KafkaAppenderTest.class.getName())
+            .setLevel(Level.INFO)
+            .setMessage(new SimpleMessage(LOG_MESSAGE))
+            .build();
+    }
+
+}
\ No newline at end of file
Index: pom.xml
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- pom.xml	(revision 546f4d0439c3cc14c6e78ad5b81c84790ee94da7)
+++ pom.xml	(revision )
@@ -559,7 +559,12 @@
           </exclusion>
         </exclusions>
       </dependency>
- 	    <dependency>
+      <dependency>
+        <groupId>org.apache.kafka</groupId>
+        <artifactId>kafka-clients</artifactId>
+        <version>0.8.2.1</version>
+      </dependency>
+      <dependency>
         <groupId>javax.servlet</groupId>
         <artifactId>servlet-api</artifactId>
         <version>2.5</version>
Index: log4j-core/pom.xml
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- log4j-core/pom.xml	(revision 546f4d0439c3cc14c6e78ad5b81c84790ee94da7)
+++ log4j-core/pom.xml	(revision )
@@ -108,6 +108,12 @@
       <scope>provided</scope>
       <optional>true</optional>
     </dependency>
+    <!-- Used for Kafka appender -->
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <optional>true</optional>
+    </dependency>
     <!-- Used for compressing to formats other than zip and gz -->
     <dependency>
       <groupId>org.apache.commons</groupId>
Index: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/DefaultKafkaProducerFactory.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/DefaultKafkaProducerFactory.java	(revision )
+++ log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/DefaultKafkaProducerFactory.java	(revision )
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+
+package org.apache.logging.log4j.core.appender.mom;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+
+import java.util.Properties;
+
+public class DefaultKafkaProducerFactory implements KafkaProducerFactory {
+
+    @Override
+    public Producer<byte[], byte[]> newKafkaProducer(Properties config) {
+        return new KafkaProducer<>(config);
+    }
+
+}
Index: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/KafkaAppender.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/KafkaAppender.java	(revision )
+++ log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/KafkaAppender.java	(revision )
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+
+package org.apache.logging.log4j.core.appender.mom;
+
+import org.apache.logging.log4j.core.Filter;
+import org.apache.logging.log4j.core.Layout;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.appender.AbstractAppender;
+import org.apache.logging.log4j.core.appender.AppenderLoggingException;
+import org.apache.logging.log4j.core.config.Property;
+import org.apache.logging.log4j.core.config.plugins.Plugin;
+import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
+import org.apache.logging.log4j.core.config.plugins.PluginElement;
+import org.apache.logging.log4j.core.config.plugins.PluginFactory;
+import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
+import org.apache.logging.log4j.core.util.Booleans;
+
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Appender to send log events to an Apache Kafka topic.
+ */
+@Plugin(name = "Kafka", category = "Core", elementType = "appender", printObject = true)
+public final class KafkaAppender extends AbstractAppender {
+
+    private final KafkaManager manager;
+
+    private KafkaAppender(String name, Layout<? extends Serializable> layout, Filter filter, boolean ignoreExceptions, KafkaManager manager) {
+        super(name, filter, layout, ignoreExceptions);
+        this.manager = manager;
+    }
+
+    @PluginFactory
+    public static KafkaAppender createAppender(
+            @PluginElement("Layout") Layout<? extends Serializable> layout,
+            @PluginElement("Filter") Filter filter,
+            @Required(message = "No name provided for KafkaAppender") @PluginAttribute("name") String name,
+            @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) String ignore,
+            @Required(message = "No topic provided for KafkaAppender") @PluginAttribute("topic") String topic,
+            @PluginElement("Properties") Property[] properties) {
+        boolean ignoreExceptions = Booleans.parseBoolean(ignore, true);
+        KafkaManager kafkaManager = new KafkaManager(name, topic, properties);
+        return new KafkaAppender(name, layout, filter, ignoreExceptions, kafkaManager);
+    }
+
+    @Override
+    public void start() {
+        super.start();
+        manager.startup();
+    }
+
+    @Override
+    public void stop() {
+        super.stop();
+        manager.release();
+    }
+
+    @Override
+    public void append(final LogEvent event) {
+        if (event.getLoggerName().startsWith("org.apache.kafka")) {
+            LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName());
+        } else {
+            try {
+                if (getLayout() != null) {
+                    manager.send(getLayout().toByteArray(event));
+                } else {
+                    manager.send(event.getMessage().getFormattedMessage().getBytes(StandardCharsets.UTF_8));
+                }
+            } catch (final Exception e) {
+                LOGGER.error("Unable to write to Kafka [{}] for appender [{}].", manager.getName(), getName(), e);
+                throw new AppenderLoggingException("Unable to write to Kafka in appender: " + e.getMessage(), e);
+            }
+        }
+    }
+
+}
Index: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/KafkaManager.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/KafkaManager.java	(revision )
+++ log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/KafkaManager.java	(revision )
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+
+package org.apache.logging.log4j.core.appender.mom;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.logging.log4j.core.appender.AbstractManager;
+import org.apache.logging.log4j.core.config.Property;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class KafkaManager extends AbstractManager {
+
+    public static final String DEFAULT_TIMEOUT_MILLIS = "30000";
+
+    /**
+     * package-private access for testing.
+     */
+    static KafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory();
+
+    private final String topic;
+    private final Properties config = new Properties();
+    private final int timeoutMillis;
+
+    private Producer<byte[], byte[]> producer = null;
+
+    public KafkaManager(String name, String topic, Property[] properties) {
+        super(name);
+        this.topic = topic;
+        config.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+        config.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+        config.setProperty("batch.size", "0");
+        for (Property property : properties) {
+            config.setProperty(property.getName(), property.getValue());
+        }
+        this.timeoutMillis = Integer.parseInt(config.getProperty("timeout.ms", DEFAULT_TIMEOUT_MILLIS));
+    }
+
+    public void startup() {
+        producer = producerFactory.newKafkaProducer(config);
+    }
+
+    public void send(byte[] msg) throws ExecutionException, InterruptedException, TimeoutException {
+        if (producer != null) {
+            producer.send(new ProducerRecord<byte[], byte[]>(topic, msg)).get(timeoutMillis, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    @Override
+    public void releaseSub() {
+        if (producer != null) {
+            // This thread is a workaround for this Kafka issue: https://issues.apache.org/jira/browse/KAFKA-1660
+            Thread closeThread = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    producer.close();
+                }
+            });
+            closeThread.setName("KafkaManager-CloseThread");
+            closeThread.setDaemon(true); // avoid blocking JVM shutdown
+            closeThread.start();
+            try {
+                closeThread.join(timeoutMillis);
+            } catch (InterruptedException ignore) {
+                // ignore
+            }
+        }
+    }
+
+}
Index: log4j-core/src/site/xdoc/index.xml
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- log4j-core/src/site/xdoc/index.xml	(revision 546f4d0439c3cc14c6e78ad5b81c84790ee94da7)
+++ log4j-core/src/site/xdoc/index.xml	(revision )
@@ -48,6 +48,7 @@
                 <li>SMTPAppender requires Javax Mail.</li>
                 <li>JMSQueueAppender and JMSTopicAppender require a JMS implementation like
                 <a href="http://activemq.apache.org/">Apache ActiveMQ</a>.</li>
+                <li>Kafka appender requires <a href="http://search.maven.org/#artifactdetails|org.apache.kafka|kafka-clients|0.8.2.1|jar">Kafka client library</a></li>
                 <li>Windows color support requires <a href="http://jansi.fusesource.org/">Jansi</a>.</li>
                 <li>The JDBC Appender requires a JDBC driver for the database you choose to write events to.</li>
                 <li>The JPA Appender requires the Java Persistence API classes, a JPA provider implementation,
\ No newline at end of file
Index: log4j-core/src/test/resources/KafkaAppenderTest.xml
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- log4j-core/src/test/resources/KafkaAppenderTest.xml	(revision )
+++ log4j-core/src/test/resources/KafkaAppenderTest.xml	(revision )
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements. See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache license, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License. You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the license for the specific language governing permissions and
+  ~ limitations under the license.
+  -->
+<Configuration name="KafkaAppenderTest" status="OFF">
+  <Appenders>
+    <Kafka name="KafkaAppender" topic="kafka-topic">
+      <Property name="bootstrap.servers">localhost:9092</Property>
+    </Kafka>
+    <Kafka name="KafkaAppenderWithLayout" topic="kafka-topic">
+      <PatternLayout pattern="[%m]"/>
+      <Property name="bootstrap.servers">localhost:9092</Property>
+    </Kafka>
+  </Appenders>
+  <Loggers>
+    <Root level="info">
+      <AppenderRef ref="KafkaAppender"/>
+      <AppenderRef ref="KafkaAppenderWithLayout"/>
+    </Root>
+  </Loggers>
+</Configuration>
\ No newline at end of file
Index: src/site/xdoc/manual/appenders.xml
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- src/site/xdoc/manual/appenders.xml	(revision 546f4d0439c3cc14c6e78ad5b81c84790ee94da7)
+++ src/site/xdoc/manual/appenders.xml	(revision )
@@ -1225,6 +1225,90 @@
     ...
 }]]></pre>
         </subsection>
+        <a name="KafkaAppender"/>
+        <subsection name="KafkaAppender">
+          <p>The KafkaAppender log events to an Apache Kafka topic.</p>
+          <table>
+            <caption align="top">KafkaAppender Parameters</caption>
+            <tr>
+              <th>Parameter Name</th>
+              <th>Type</th>
+              <th>Description</th>
+            </tr>
+            <tr>
+              <td>topic</td>
+              <td>String</td>
+              <td>The Kafka topic to use. Required.</td>
+            </tr>
+            <tr>
+              <td>lilter</td>
+              <td>Filter</td>
+              <td>A Filter to determine if the event should be handled by this Appender. More than one Filter
+                may be used by using a CompositeFilter.</td>
+            </tr>
+            <tr>
+              <td>layout</td>
+              <td>Layout</td>
+              <td>
+                The Layout to use to format the LogEvent. If you do not specify a layout,
+                if not specified the <a class="javadoc" href="http://logging.apache.org/log4j/2.x/log4j-api/apidocs/org/apache/logging/log4j/message/Message.html#getFormattedMessage())">formatted message</a>
+                as an UTF-8 encoded string will be sent to Kafka.
+              </td>
+            </tr>
+            <tr>
+              <td>name</td>
+              <td>String</td>
+              <td>The name of the Appender. Required.</td>
+            </tr>
+            <tr>
+              <td>ignoreExceptions</td>
+              <td>boolean</td>
+              <td>The default is <code>true</code>, causing exceptions encountered while appending events to be
+                internally logged and then ignored. When set to <code>false</code> exceptions will be propagated to the
+                caller, instead. You must set this to <code>false</code> when wrapping this Appender in a
+                <a href="#FailoverAppender">FailoverAppender</a>.</td>
+            </tr>
+            <tr>
+              <td>properties</td>
+              <td>Property</td>
+              <td>
+                You can set any properties in <a href="http://kafka.apache.org/documentation.html#newproducerconfigs">Kafka new producer properties</a>.
+                You need to set the <code>bootstrap.servers</code> property, there are sensible default values for the others.
+              </td>
+            </tr>
+          </table>
+          <p>
+            Here is a sample KafkaAppender configuration snippet:
+          </p>
+
+          <pre class="prettyprint linenums"><![CDATA[<?xml version="1.0" encoding="UTF-8"?>
+    <Appenders>
+        <Kafka name="Kafka" topic="log-test">
+            <PatternLayout pattern="%date %message"/>
+            <Property name="bootstrap.servers">localhost:9092</Property>
+        </Kafka>
+    </Appenders>]]></pre>
+          <p>
+            This appender is synchronous and will block until the record has been acknowledged by the Kafka server, timeout for this
+            can be set with the <code>timeout.ms</code> property (defaults to 30 seconds). Wrap with
+            <a href="http://logging.apache.org/log4j/2.x/manual/appenders.html#AsyncAppender">Async appender</a> to log asynchronously.
+          </p>
+          <p>
+            This appender requires <a href="http://search.maven.org/#artifactdetails|org.apache.kafka|kafka-clients|0.8.2.1|jar">Kafka client library</a>
+          </p>
+          <p>
+            <em>Note:</em>Make sure to not let <code>org.apache.kafka</code> log to a Kafka appender on DEBUG level,
+            since that will cause recursive logging:
+
+          <pre class="prettyprint linenums"><![CDATA[<?xml version="1.0" encoding="UTF-8"?>
+            <Loggers>
+              <Root level="DEBUG">
+                <AppenderRef ref="Kafka"/>
+              </Root>
+              <Logger name="org.apache.kafka" level="INFO" /> <!-- avoid recursive logging -->
+            </Loggers>]]></pre>
+          </p>
+        </subsection>
       <a name="MemoryMappedFileAppender" />
       <subsection name="MemoryMappedFileAppender">
         <p><i>New since 2.1. Be aware that this is a new addition, and although it has been
Index: log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/KafkaProducerFactory.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/KafkaProducerFactory.java	(revision )
+++ log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/KafkaProducerFactory.java	(revision )
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+
+package org.apache.logging.log4j.core.appender.mom;
+
+import org.apache.kafka.clients.producer.Producer;
+
+import java.util.Properties;
+
+public interface KafkaProducerFactory {
+
+    Producer<byte[], byte[]> newKafkaProducer(Properties config);
+
+}
