From 357fca8dc4c6b1e68a06468eee7fe7a9aa96e449 Mon Sep 17 00:00:00 2001
From: benwa <btellier@linagora.com>
Date: Sun, 8 Feb 2015 14:16:38 +0100
Subject: [PATCH] MAILBOX-226 Cassandra : Allow password authentication and
 clusters

MAILBOX-226 Cassandra : improve table creation
---
 cassandra/README.txt                               |  18 +++
 .../mailbox/cassandra/CassandraConstants.java      |  24 ++++
 .../CassandraMailboxSessionMapperFactory.java      |   2 +-
 .../james/mailbox/cassandra/CassandraSession.java  | 152 ---------------------
 .../mailbox/cassandra/CassandraTableManager.java   | 149 ++++++++++++++++++++
 .../james/mailbox/cassandra/ClusterFactory.java    |  69 ++++++++++
 .../ClusterWithKeyspaceCreatedFactory.java         |  32 +++++
 .../james/mailbox/cassandra/SessionFactory.java    |  39 ++++++
 .../mailbox/cassandra/mail/CassandraACLMapper.java |   4 +-
 .../mail/utils/FunctionRunnerWithRetry.java        |  25 +++-
 .../META-INF/spring/mailbox-cassandra.xml          |  26 +++-
 .../cassandra/CassandraClusterSingleton.java       | 118 ++++++----------
 .../cassandra/CassandraMailboxManagerTest.java     |  15 +-
 pom.xml                                            |   2 +-
 14 files changed, 423 insertions(+), 252 deletions(-)
 create mode 100644 cassandra/README.txt
 create mode 100644 cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraConstants.java
 delete mode 100644 cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraSession.java
 create mode 100644 cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraTableManager.java
 create mode 100644 cassandra/src/main/java/org/apache/james/mailbox/cassandra/ClusterFactory.java
 create mode 100644 cassandra/src/main/java/org/apache/james/mailbox/cassandra/ClusterWithKeyspaceCreatedFactory.java
 create mode 100644 cassandra/src/main/java/org/apache/james/mailbox/cassandra/SessionFactory.java

diff --git a/cassandra/README.txt b/cassandra/README.txt
new file mode 100644
index 0000000..ace4cff
--- /dev/null
+++ b/cassandra/README.txt
@@ -0,0 +1,18 @@
+= Cassandra Mailbox implementation
+
+This Mailbox sub-project is about providing a scalable mailbox implementation relying on Cassandra database.
+
+Concurrency is handled by this implementation while performing writes using Lightweight transactions. You do not need to lock anything, or provide utils to lock anything, when using this implementation.
+
+== Configuration
+
+The configuration is achieved through Spring. The file is 'src/main/resources/META-INF/spring/mailbox-cassandra.xml' .
+
+The components are instanciated and wired together.
+
+What might interest you the most is the way you want to connect your Cassandra cluster.
+
+Factories are used. You have :
+  * ClusterFactory : you specify which Cassandra servers you want to connect, with ( optional ) which user name and password to use.
+  * ClusterWithKeyspaceCreatedFactory : This ( optional ) component creates a Keyspace if it does not already exists. You may want to skip this step in production environment.
+  * SessionFactory : Connect the appropriated Keyspace, to create a Session our application can work with.
\ No newline at end of file
diff --git a/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraConstants.java b/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraConstants.java
new file mode 100644
index 0000000..1c24d46
--- /dev/null
+++ b/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraConstants.java
@@ -0,0 +1,24 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra;
+
+public interface CassandraConstants {
+    int LIGHTWEIGHT_TRANSACTION_APPLIED = 0;
+}
diff --git a/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java b/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
index 0f21998..54ae2b1 100644
--- a/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
+++ b/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
@@ -46,7 +46,7 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
     private ModSeqProvider<UUID> modSeqProvider;
     private int maxRetry;
 
-    public CassandraMailboxSessionMapperFactory(CassandraUidProvider uidProvider, ModSeqProvider<UUID> modSeqProvider, CassandraSession session) {
+    public CassandraMailboxSessionMapperFactory(CassandraUidProvider uidProvider, ModSeqProvider<UUID> modSeqProvider, Session session) {
         this.uidProvider = uidProvider;
         this.modSeqProvider = modSeqProvider;
         this.session = session;
diff --git a/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraSession.java b/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraSession.java
deleted file mode 100644
index 9452314..0000000
--- a/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraSession.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.mailbox.cassandra;
-
-import com.datastax.driver.core.CloseFuture;
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.RegularStatement;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.ResultSetFuture;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.Statement;
-import com.google.common.util.concurrent.ListenableFuture;
-
-/**
- * A Cassandra session with the default keyspace
- *
- */
-public class CassandraSession implements Session {
-    private final static String DEFAULT_CLUSTER_IP = "localhost";
-    private final static int DEFAULT_CLUSTER_PORT = 9042;
-    private final static String DEFAULT_KEYSPACE_NAME = "apache_james";
-    private final static int DEFAULT_REPLICATION_FACTOR = 1;
-    
-    public static final int LIGHTWEIGHT_TRANSACTION_APPLIED = 0;
-
-    private Session session;
-
-    public CassandraSession(String ip, int port, String keyspace, int replicationFactor) {
-        Cluster cluster = Cluster.builder().addContactPoint(ip).withPort(port).build();
-        if (cluster.getMetadata().getKeyspace(keyspace) == null) {
-            initDatabase(cluster, keyspace, replicationFactor);
-        }
-        session = cluster.connect(keyspace);
-    }
-
-    private void initDatabase(Cluster cluster, String keyspace, int replicationFactor) {
-        session = cluster.connect();
-        session.execute("CREATE KEYSPACE IF NOT EXISTS " + keyspace + " WITH replication " + "= {'class':'SimpleStrategy', 'replication_factor':" + replicationFactor + "};");
-        session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + ".mailbox (" + "id uuid PRIMARY KEY," + "name text, namespace text," + "uidvalidity bigint," + "user text," + "path text" + ");");
-        session.execute("CREATE INDEX IF NOT EXISTS ON " + keyspace + ".mailbox(path);");
-        session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + ".messageCounter (" + "mailboxId UUID PRIMARY KEY," + "nextUid bigint," + ");");
-        session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + ".mailboxCounters (" + "mailboxId UUID PRIMARY KEY," + "count counter," + "unseen counter," + "nextModSeq counter" + ");");
-        session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + ".message (" + "mailboxId UUID," + "uid bigint," + "internalDate timestamp," + "bodyStartOctet int," + "content blob," + "modSeq bigint," + "mediaType text," + "subType text," + "fullContentOctets int," + "bodyOctets int,"
-                + "textualLineCount bigint," + "bodyContent blob," + "headerContent blob," + "flagAnswered boolean," + "flagDeleted boolean," + "flagDraft boolean," + "flagRecent boolean," + "flagSeen boolean," + "flagFlagged boolean," + "flagUser boolean," + "flagVersion bigint," + "PRIMARY KEY (mailboxId, uid)" + ");");
-        session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + ".subscription (" + "user text," + "mailbox text," + "PRIMARY KEY (mailbox, user)" + ");");
-        session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + ".acl (id uuid PRIMARY KEY, acl text, version bigint);");
-        session.close();
-    }
-
-    public CassandraSession() {
-        this(DEFAULT_CLUSTER_IP, DEFAULT_CLUSTER_PORT, DEFAULT_KEYSPACE_NAME, DEFAULT_REPLICATION_FACTOR);
-    }
-
-    @Override
-    public String getLoggedKeyspace() {
-        return session.getLoggedKeyspace();
-    }
-
-    @Override
-    public Session init() {
-        return session.init();
-    }
-
-    @Override
-    public ResultSet execute(String query) {
-        return session.execute(query);
-    }
-
-    @Override
-    public ResultSet execute(String query, Object... values) {
-        return session.execute(query, values);
-    }
-
-    @Override
-    public ResultSet execute(Statement statement) {
-        return session.execute(statement);
-    }
-
-    @Override
-    public ResultSetFuture executeAsync(String query) {
-        return session.executeAsync(query);
-    }
-
-    @Override
-    public ResultSetFuture executeAsync(String query, Object... values) {
-        return session.executeAsync(query, values);
-    }
-
-    @Override
-    public ResultSetFuture executeAsync(Statement statement) {
-        return session.executeAsync(statement);
-    }
-
-    @Override
-    public PreparedStatement prepare(String query) {
-        return session.prepare(query);
-    }
-
-    @Override
-    public PreparedStatement prepare(RegularStatement statement) {
-        return session.prepare(statement);
-    }
-
-    @Override
-    public ListenableFuture<PreparedStatement> prepareAsync(String query) {
-        return session.prepareAsync(query);
-    }
-
-    @Override
-    public ListenableFuture<PreparedStatement> prepareAsync(RegularStatement statement) {
-        return session.prepareAsync(statement);
-    }
-
-    @Override
-    public CloseFuture closeAsync() {
-        return session.closeAsync();
-    }
-
-    @Override
-    public void close() {
-        session.close();
-    }
-
-    @Override
-    public boolean isClosed() {
-        return session.isClosed();
-    }
-
-    @Override
-    public Cluster getCluster() {
-        return session.getCluster();
-    }
-
-}
diff --git a/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraTableManager.java b/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraTableManager.java
new file mode 100644
index 0000000..72e5d4f
--- /dev/null
+++ b/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraTableManager.java
@@ -0,0 +1,149 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra;
+
+import static com.datastax.driver.core.DataType.*;
+
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.schemabuilder.Create;
+
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.schemabuilder.SchemaBuilder;
+import com.datastax.driver.core.schemabuilder.SchemaStatement;
+import org.apache.james.mailbox.cassandra.table.CassandraACLTable;
+import org.apache.james.mailbox.cassandra.table.CassandraMailboxCountersTable;
+import org.apache.james.mailbox.cassandra.table.CassandraMailboxTable;
+import org.apache.james.mailbox.cassandra.table.CassandraMessageTable;
+import org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable;
+import org.apache.james.mailbox.cassandra.table.CassandraSubscriptionTable;
+
+import java.util.Arrays;
+
+public class CassandraTableManager {
+
+    private Session session;
+
+    enum TABLE {
+        Mailbox(CassandraMailboxTable.TABLE_NAME,
+            SchemaBuilder.createTable(CassandraMailboxTable.TABLE_NAME)
+                .ifNotExists()
+                .addPartitionKey(CassandraMailboxTable.ID, uuid())
+                .addColumn(CassandraMailboxTable.NAMESPACE, text())
+                .addColumn(CassandraMailboxTable.USER, text())
+                .addColumn(CassandraMailboxTable.NAME, text())
+                .addColumn(CassandraMailboxTable.PATH, text())
+                .addColumn(CassandraMailboxTable.UIDVALIDITY, bigint())),
+        MailboxCounter(CassandraMailboxCountersTable.TABLE_NAME,
+            SchemaBuilder.createTable(CassandraMailboxCountersTable.TABLE_NAME)
+                .ifNotExists()
+                .addPartitionKey(CassandraMailboxCountersTable.MAILBOX_ID, uuid())
+                .addColumn(CassandraMailboxCountersTable.COUNT, counter())
+                .addColumn(CassandraMailboxCountersTable.UNSEEN, counter())
+                .addColumn(CassandraMailboxCountersTable.NEXT_MOD_SEQ, counter())),
+        MessageUid(CassandraMessageUidTable.TABLE_NAME,
+            SchemaBuilder.createTable(CassandraMessageUidTable.TABLE_NAME)
+                .ifNotExists()
+                .addPartitionKey(CassandraMessageUidTable.MAILBOX_ID, uuid())
+                .addColumn(CassandraMessageUidTable.NEXT_UID, bigint())),
+        Message(CassandraMessageTable.TABLE_NAME,
+            SchemaBuilder.createTable(CassandraMessageTable.TABLE_NAME)
+                .ifNotExists()
+                .addPartitionKey(CassandraMessageTable.MAILBOX_ID, uuid())
+                .addClusteringColumn(CassandraMessageTable.IMAP_UID, bigint())
+                .addColumn(CassandraMessageTable.INTERNAL_DATE, timestamp())
+                .addColumn(CassandraMessageTable.BODY_START_OCTET, cint())
+                .addColumn(CassandraMessageTable.BODY_OCTECTS, cint())
+                .addColumn(CassandraMessageTable.TEXTUAL_LINE_COUNT, bigint())
+                .addColumn(CassandraMessageTable.MOD_SEQ, bigint())
+                .addColumn(CassandraMessageTable.MEDIA_TYPE, text())
+                .addColumn(CassandraMessageTable.SUB_TYPE, text())
+                .addColumn(CassandraMessageTable.FULL_CONTENT_OCTETS, cint())
+                .addColumn(CassandraMessageTable.BODY_CONTENT, blob())
+                .addColumn(CassandraMessageTable.HEADER_CONTENT, blob())
+                .addColumn(CassandraMessageTable.Flag.ANSWERED, cboolean())
+                .addColumn(CassandraMessageTable.Flag.DELETED, cboolean())
+                .addColumn(CassandraMessageTable.Flag.DRAFT, cboolean())
+                .addColumn(CassandraMessageTable.Flag.FLAGGED, cboolean())
+                .addColumn(CassandraMessageTable.Flag.RECENT, cboolean())
+                .addColumn(CassandraMessageTable.Flag.SEEN, cboolean())
+                .addColumn(CassandraMessageTable.Flag.USER, cboolean())
+                .addColumn(CassandraMessageTable.FLAG_VERSION, bigint())),
+        Subscription(CassandraSubscriptionTable.TABLE_NAME,
+            SchemaBuilder.createTable(CassandraSubscriptionTable.TABLE_NAME)
+                .ifNotExists()
+                .addPartitionKey(CassandraSubscriptionTable.MAILBOX, text())
+                .addClusteringColumn(CassandraSubscriptionTable.USER, text())
+        ),
+        Acl(CassandraACLTable.TABLE_NAME,
+            SchemaBuilder.createTable(CassandraACLTable.TABLE_NAME)
+                .ifNotExists()
+                .addPartitionKey(CassandraACLTable.ID, uuid())
+                .addColumn(CassandraACLTable.ACL, text())
+                .addColumn(CassandraACLTable.VERSION, bigint())
+        )
+        ;
+        private Create createStatement;
+        private String name;
+
+        TABLE(String name, Create createStatement) {
+            this.createStatement = createStatement;
+            this.name = name;
+        }
+    }
+
+    enum INDEX {
+        MailboxPath(SchemaBuilder.createIndex(CassandraMailboxTable.TABLE_NAME)
+            .ifNotExists()
+            .onTable(CassandraMailboxTable.TABLE_NAME)
+            .andColumn(CassandraMailboxTable.PATH));
+        private SchemaStatement createIndexStatement;
+
+        INDEX(SchemaStatement createIndexStatement) {
+            this.createIndexStatement = createIndexStatement;
+        }
+    }
+
+    public CassandraTableManager(Session session) {
+        this.session = session;
+    }
+
+    public CassandraTableManager ensureAllTables() {
+        Arrays.asList(TABLE.values())
+            .forEach(
+                (table) -> session.execute(table.createStatement)
+            );
+        Arrays.asList(INDEX.values())
+            .forEach(
+                (table) -> session.execute(table.createIndexStatement)
+            );
+        return this;
+    }
+
+    public void clearAllTables() {
+        Arrays.asList(TABLE.values())
+            .forEach(
+                (table) -> clearTable(table.name)
+            );
+    }
+
+    private void clearTable(String tableName) {
+        session.execute(QueryBuilder.truncate(tableName));
+    }
+}
diff --git a/cassandra/src/main/java/org/apache/james/mailbox/cassandra/ClusterFactory.java b/cassandra/src/main/java/org/apache/james/mailbox/cassandra/ClusterFactory.java
new file mode 100644
index 0000000..4d68079
--- /dev/null
+++ b/cassandra/src/main/java/org/apache/james/mailbox/cassandra/ClusterFactory.java
@@ -0,0 +1,69 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+public class ClusterFactory {
+
+    public static class CassandraServer {
+        private String ip;
+        private int port;
+
+        public CassandraServer(String ip, int port) {
+            this.ip = ip;
+            this.port = port;
+        }
+    }
+
+    private final static String DEFAULT_CLUSTER_IP = "localhost";
+    private final static int DEFAULT_CLUSTER_PORT = 9042;
+
+    public Cluster createClusterForClusterWithPassWord(List<CassandraServer> servers, String userName, String password) {
+        Cluster.Builder clusterBuilder = Cluster.builder();
+        servers.forEach(
+            (server) -> clusterBuilder.addContactPoint(server.ip).withPort(server.port)
+        );
+        if(!Strings.isNullOrEmpty(userName) && !Strings.isNullOrEmpty(password)) {
+            clusterBuilder.withCredentials(userName, password);
+        }
+        return clusterBuilder.build();
+    }
+
+    public Cluster createClusterForClusterWithoutPassWord(List<CassandraServer> servers) {
+        return createClusterForClusterWithPassWord(servers, null, null);
+    }
+    
+    public Cluster createClusterForSingleServerWithPassWord(String ip, int port, String userName, String password) {
+        return createClusterForClusterWithPassWord(ImmutableList.of(new CassandraServer(ip, port)), userName, password);
+    }
+
+    public Cluster createClusterForSingleServerWithoutPassWord(String ip, int port) {
+        return createClusterForClusterWithPassWord(ImmutableList.of(new CassandraServer(ip, port)), null, null);
+    }
+
+    public Cluster createDefaultSession() {
+        return createClusterForSingleServerWithoutPassWord(DEFAULT_CLUSTER_IP, DEFAULT_CLUSTER_PORT);
+    }
+}
diff --git a/cassandra/src/main/java/org/apache/james/mailbox/cassandra/ClusterWithKeyspaceCreatedFactory.java b/cassandra/src/main/java/org/apache/james/mailbox/cassandra/ClusterWithKeyspaceCreatedFactory.java
new file mode 100644
index 0000000..fcee8ee
--- /dev/null
+++ b/cassandra/src/main/java/org/apache/james/mailbox/cassandra/ClusterWithKeyspaceCreatedFactory.java
@@ -0,0 +1,32 @@
+package org.apache.james.mailbox.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+
+public class ClusterWithKeyspaceCreatedFactory {
+
+    private final static int DEFAULT_REPLICATION_FACTOR = 1;
+
+    public Cluster clusterWithInitializedKeyspace(Cluster cluster, String keyspace, int replicationFactor) {
+        if (isKeyspacePresent(cluster, keyspace)) {
+            createKeyspace(cluster, keyspace, replicationFactor);
+        }
+        return cluster;
+    }
+
+    public Cluster clusterWithInitializedKeyspace(Cluster cluster, String keyspace) {
+        return clusterWithInitializedKeyspace(cluster, keyspace, DEFAULT_REPLICATION_FACTOR);
+    }
+
+    private boolean isKeyspacePresent(Cluster cluster, String keyspace) {
+        return cluster.getMetadata().getKeyspace(keyspace) == null;
+    }
+
+    private void createKeyspace(Cluster cluster, String keyspace, int replicationFactor) {
+        try (Session session = cluster.connect()) {
+            session.execute("CREATE KEYSPACE IF NOT EXISTS " + keyspace
+                + " WITH replication = {'class':'SimpleStrategy', 'replication_factor':" + replicationFactor + "};");
+        }
+    }
+
+}
diff --git a/cassandra/src/main/java/org/apache/james/mailbox/cassandra/SessionFactory.java b/cassandra/src/main/java/org/apache/james/mailbox/cassandra/SessionFactory.java
new file mode 100644
index 0000000..06cce8d
--- /dev/null
+++ b/cassandra/src/main/java/org/apache/james/mailbox/cassandra/SessionFactory.java
@@ -0,0 +1,39 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+
+public class SessionFactory {
+    private final static String DEFAULT_KEYSPACE_NAME = "apache_james";
+
+    public Session createSession(Cluster cluster, String keyspace) {
+        Session session = cluster.connect(keyspace);
+        new CassandraTableManager(session)
+            .ensureAllTables();
+        return session;
+    }
+
+    public Session createSession(Cluster cluster) {
+        return createSession(cluster, DEFAULT_KEYSPACE_NAME);
+    }
+
+}
diff --git a/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java b/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
index 8a84c2f..a7c80b1 100644
--- a/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
+++ b/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
@@ -25,7 +25,7 @@ import com.datastax.driver.core.Session;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
-import org.apache.james.mailbox.cassandra.CassandraSession;
+import org.apache.james.mailbox.cassandra.CassandraConstants;
 import org.apache.james.mailbox.cassandra.mail.utils.SimpleMailboxACLJsonConverter;
 import org.apache.james.mailbox.cassandra.mail.utils.FunctionRunnerWithRetry;
 import org.apache.james.mailbox.cassandra.table.CassandraACLTable;
@@ -92,7 +92,7 @@ public class CassandraACLMapper {
                         .map((x) -> x.apply(command))
                         .map(this::updateStoredACL)
                         .orElseGet(() -> insertACL(applyCommandOnEmptyACL(command)));
-                return resultSet.one().getBool(CassandraSession.LIGHTWEIGHT_TRANSACTION_APPLIED);
+                return resultSet.one().getBool(CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED);
             }
         );
     }
diff --git a/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/FunctionRunnerWithRetry.java b/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/FunctionRunnerWithRetry.java
index 64c35fc..ee8a1ce 100644
--- a/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/FunctionRunnerWithRetry.java
+++ b/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/FunctionRunnerWithRetry.java
@@ -22,11 +22,17 @@ package org.apache.james.mailbox.cassandra.mail.utils;
 import com.google.common.base.Preconditions;
 import org.apache.james.mailbox.exception.MailboxException;
 
+import java.util.Optional;
 import java.util.function.BooleanSupplier;
 import java.util.stream.IntStream;
 
-public class FunctionRunnerWithRetry {
-    
+public class FunctionRunnerWithRetry<Id> {
+
+    @FunctionalInterface
+    public interface OptionalSupplier<Id> {
+        Optional<Id> getAsOptional();
+    }
+
     private final int maxRetry;
 
     public FunctionRunnerWithRetry(int maxRetry) {
@@ -36,10 +42,17 @@ public class FunctionRunnerWithRetry {
 
     public void execute(BooleanSupplier functionNotifyingSuccess) throws MailboxException {
         IntStream.range(0, maxRetry)
-            .filter(
-                (x) -> functionNotifyingSuccess.getAsBoolean()
-            ).findFirst()
+            .filter((x) -> functionNotifyingSuccess.getAsBoolean())
+            .findFirst()
             .orElseThrow(() -> new MailboxException("Can not execute Boolean Supplier."));
     }
-    
+
+    public Id executeAndRetrieveObject(OptionalSupplier<Id> functionNotifyingSuccess) throws MailboxException {
+        return IntStream.range(0, maxRetry)
+            .mapToObj((x) -> functionNotifyingSuccess.getAsOptional())
+            .filter(Optional::isPresent)
+            .findFirst()
+            .orElseThrow(() -> new MailboxException("Can not execute Optional Supplier. " + maxRetry + " retries."))
+            .get();
+    }
 }
diff --git a/cassandra/src/main/resources/META-INF/spring/mailbox-cassandra.xml b/cassandra/src/main/resources/META-INF/spring/mailbox-cassandra.xml
index 6e803aa..026a622 100644
--- a/cassandra/src/main/resources/META-INF/spring/mailbox-cassandra.xml
+++ b/cassandra/src/main/resources/META-INF/spring/mailbox-cassandra.xml
@@ -50,17 +50,31 @@
         <constructor-arg index="0" ref="cassandra-session"/>
     </bean>
 
+    <alias name="jvm-locker" alias="cassandra-locker"/>
+
     <!--
-      The parameters are : the IP of a Cassendra cluster, the port, the keyspace and the replication factor
-      Default values are : localhost, 9042, apache_james and 1
+      The Cluster factory is responsible for connecting the cluster
+
+      The ClusterWithKeyspaceCreatedFactory is responsible for creating the keyspace if not present.
+
+      The SessionFactory is responsible for giving a session we can work with
     -->
-    <bean id="cassandra-session" class="org.apache.james.mailbox.cassandra.CassandraSession">
+    <bean id="cassandra-cluster" class="org.apache.james.mailbox.cassandra.ClusterFactory" factory-method="createClusterForSingleServerWithoutPassWord">
         <constructor-arg index="0" value="localhost"/>
         <constructor-arg index="1" value="9042" type="int"/>
-        <constructor-arg index="2" value="apache_james"/>
-        <constructor-arg index="3" value="1" type="int"/>
     </bean>
 
-    <alias name="jvm-locker" alias="cassandra-locker"/>
+    <bean id="cassandra-cluster-initialized" class="org.apache.james.mailbox.cassandra.ClusterWithKeyspaceCreatedFactory" factory-method="clusterWithInitializedKeyspace">
+        <constructor-arg index="0" ref="cassandra-cluster"/>
+        <constructor-arg index="1" value="apache_james"/>
+        <constructor-arg index="2" value="1" type="int"/>
+    </bean>
+
+    <bean id="cassandra-session" class="org.apache.james.mailbox.cassandra.SessionFactory" factory-method="">
+        <constructor-arg index="0" ref="cassandra-cluster-initialized"/>
+        <constructor-arg index="1" value="apache_james"/>
+    </bean>
+
+
 
 </beans>
diff --git a/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraClusterSingleton.java b/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraClusterSingleton.java
index d7eb6e0..ada793b 100644
--- a/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraClusterSingleton.java
+++ b/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraClusterSingleton.java
@@ -18,23 +18,33 @@
  ****************************************************************/
 package org.apache.james.mailbox.cassandra;
 
-import org.apache.commons.lang.NotImplementedException;
+
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import com.google.common.base.Throwables;
+import org.apache.james.mailbox.cassandra.mail.utils.FunctionRunnerWithRetry;
 import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Optional;
+
 /**
  * Class that will creates a single instance of Cassandra session.
  */
 public final class CassandraClusterSingleton {
-    private final static String CLUSTER_IP = "localhost";
-    private final static int CLUSTER_PORT_TEST = 9142;
-    private final static String KEYSPACE_NAME = "apache_james";
-    private final static int DEFAULT_REPLICATION_FACTOR = 1;
+    private static final String CLUSTER_IP = "localhost";
+    private static final int CLUSTER_PORT_TEST = 9142;
+    private static final String KEYSPACE_NAME = "apache_james";
+    private static final int REPLICATION_FACTOR = 1;
+
+    private static final long SLEEP_BEFORE_RETRY = 200;
+    private static final int MAX_RETRY = 200;
 
     private static final Logger LOG = LoggerFactory.getLogger(CassandraClusterSingleton.class);
     private static CassandraClusterSingleton cluster = null;
-    private CassandraSession session;
+    private Session session;
 
     /**
      * Builds a MiniCluster instance.
@@ -51,89 +61,45 @@ public final class CassandraClusterSingleton {
     }
 
     private CassandraClusterSingleton() throws RuntimeException {
-       try {
+        try {
             EmbeddedCassandraServerHelper.startEmbeddedCassandra();
-            // Let Cassandra initialization before creating
-            // the session. Solve very fast computer tests run.
-            Thread.sleep(2000);
-            this.session = new CassandraSession(CLUSTER_IP, CLUSTER_PORT_TEST, KEYSPACE_NAME, DEFAULT_REPLICATION_FACTOR);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
+            session = new FunctionRunnerWithRetry<Session>(MAX_RETRY)
+                .executeAndRetrieveObject(CassandraClusterSingleton.this::tryInitializeSession);
+        } catch(Exception exception) {
+            Throwables.propagate(exception);
         }
     }
 
-    /**
-     * Return a configuration for the runnning MiniCluster.
-     *
-     * @return
-     */
-    public CassandraSession getConf() {
+    public Session getConf() {
         return session;
     }
 
-    /**
-     * Create a specific table.
-     *
-     * @param tableName
-     *            the table name
-     */
-    public void ensureTable(String tableName) {
-        if (tableName.equals("mailbox")) {
-            session.execute("CREATE TABLE IF NOT EXISTS " + session.getLoggedKeyspace() + ".mailbox (" + "id uuid PRIMARY KEY," + "name text, namespace text," + "uidvalidity bigint," + "user text," + "path text" + ");");
-
-            session.execute("CREATE INDEX IF NOT EXISTS ON " + session.getLoggedKeyspace() + ".mailbox(path);");
-        } else if (tableName.equals("messageCounter")) {
-            session.execute("CREATE TABLE IF NOT EXISTS " + session.getLoggedKeyspace() + ".messageCounter (" + "mailboxId UUID PRIMARY KEY," + "nextUid bigint," + ");");
-        } else if (tableName.equals("mailboxCounters")) {
-            session.execute("CREATE TABLE IF NOT EXISTS " + session.getLoggedKeyspace() + ".mailboxCounters (" + "mailboxId UUID PRIMARY KEY," + "count counter," + "unseen counter," + "nextModSeq counter" + ");");
-        } else if (tableName.equals("message")) {
-            session.execute("CREATE TABLE IF NOT EXISTS " + session.getLoggedKeyspace() + ".message (" + "mailboxId UUID," + "uid bigint," + "internalDate timestamp," + "bodyStartOctet int," + "content blob," + "modSeq bigint," + "mediaType text," + "subType text," + "fullContentOctets int,"
-                    + "bodyOctets int," + "textualLineCount bigint," + "bodyContent blob," + "headerContent blob," + "flagAnswered boolean," + "flagDeleted boolean," + "flagDraft boolean," + "flagRecent boolean," + "flagSeen boolean," + "flagFlagged boolean," + "flagUser boolean,"
-                    + "flagVersion bigint,"+ "PRIMARY KEY (mailboxId, uid)" + ");");
-        } else if (tableName.equals("subscription")) {
-            session.execute("CREATE TABLE IF NOT EXISTS " + session.getLoggedKeyspace() + ".subscription (" + "user text," + "mailbox text," + "PRIMARY KEY (mailbox, user)" + ");");
-        } else if (tableName.equals("quota")) {
-            session.execute("CREATE TABLE IF NOT EXISTS " + session.getLoggedKeyspace() + ".quota ("
-                    + "user text PRIMARY KEY,"
-                    + "size_quota counter,"
-                    + "count_quota counter"
-                    + ");");
-        }  else if (tableName.equals("acl")) {
-            session.execute("CREATE TABLE IF NOT EXISTS " + session.getLoggedKeyspace() + ".acl (id uuid PRIMARY KEY, acl text, version bigint);");
-        } else {
-            throw new NotImplementedException("We don't support the class " + tableName);
-        }
+    public void ensureAllTables() {
+        new CassandraTableManager(session).ensureAllTables();
     }
 
-    /**
-     * Ensure all tables
-     */
-    public void ensureAllTables() {
-        ensureTable("mailbox");
-        ensureTable("mailboxCounters");
-        ensureTable("message");
-        ensureTable("subscription");
-        ensureTable("acl");
+    public void clearAllTables() {
+        new CassandraTableManager(session).clearAllTables();
     }
 
-    /**
-     * Delete all rows from specified table.
-     * 
-     * @param tableName
-     */
-    public void clearTable(String tableName) {
-        session.execute("TRUNCATE " + tableName + ";");
+    private Optional<Session> tryInitializeSession() {
+        try {
+            Cluster cluster = new ClusterFactory().createClusterForSingleServerWithoutPassWord(CLUSTER_IP, CLUSTER_PORT_TEST);
+            Cluster clusterWithInitializedKeyspace = new ClusterWithKeyspaceCreatedFactory()
+                .clusterWithInitializedKeyspace(cluster, KEYSPACE_NAME, REPLICATION_FACTOR);
+            return Optional.of(new SessionFactory().createSession(clusterWithInitializedKeyspace, KEYSPACE_NAME));
+        } catch (NoHostAvailableException exception) {
+            sleep(SLEEP_BEFORE_RETRY);
+            return Optional.empty();
+        }
     }
 
-    /**
-     * Delete all rows for all tables.
-     */
-    public void clearAllTables() {
-        clearTable("mailbox");
-        clearTable("mailboxCounters");
-        clearTable("message");
-        clearTable("subscription");
-        clearTable("acl");
+    private void sleep(long sleepMs) {
+        try {
+            Thread.sleep(sleepMs);
+        } catch(InterruptedException interruptedException) {
+            Throwables.propagate(interruptedException);
+        }
     }
 
 }
diff --git a/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java b/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java
index 8ff72ed..51a16c6 100644
--- a/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java
+++ b/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java
@@ -22,7 +22,6 @@ import org.apache.james.mailbox.AbstractMailboxManagerTest;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.cassandra.mail.CassandraModSeqProvider;
 import org.apache.james.mailbox.cassandra.mail.CassandraUidProvider;
-import org.apache.james.mailbox.cassandra.table.CassandraMailboxTable;
 import org.apache.james.mailbox.exception.BadCredentialsException;
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.store.JVMMailboxPathLocker;
@@ -36,7 +35,7 @@ import org.slf4j.LoggerFactory;
  */
 public class CassandraMailboxManagerTest extends AbstractMailboxManagerTest {
 
-    private static final CassandraClusterSingleton CLUSTER = CassandraClusterSingleton.build();
+    private static final CassandraClusterSingleton CASSANDRA = CassandraClusterSingleton.build();
 
     /**
      * Setup the mailboxManager.
@@ -45,8 +44,8 @@ public class CassandraMailboxManagerTest extends AbstractMailboxManagerTest {
      */
     @Before
     public void setup() throws Exception {
-        CLUSTER.ensureAllTables();
-        CLUSTER.clearAllTables();
+        CASSANDRA.ensureAllTables();
+        CASSANDRA.clearAllTables();
         createMailboxManager();
     }
 
@@ -70,9 +69,9 @@ public class CassandraMailboxManagerTest extends AbstractMailboxManagerTest {
      */
     @Override
     protected void createMailboxManager() throws MailboxException {
-        final CassandraUidProvider uidProvider = new CassandraUidProvider(CLUSTER.getConf());
-        final CassandraModSeqProvider modSeqProvider = new CassandraModSeqProvider(CLUSTER.getConf());
-        final CassandraMailboxSessionMapperFactory mapperFactory = new CassandraMailboxSessionMapperFactory(uidProvider, modSeqProvider, (CassandraSession) CLUSTER.getConf());
+        final CassandraUidProvider uidProvider = new CassandraUidProvider(CASSANDRA.getConf());
+        final CassandraModSeqProvider modSeqProvider = new CassandraModSeqProvider(CASSANDRA.getConf());
+        final CassandraMailboxSessionMapperFactory mapperFactory = new CassandraMailboxSessionMapperFactory(uidProvider, modSeqProvider, CASSANDRA.getConf());
 
         final CassandraMailboxManager manager = new CassandraMailboxManager(mapperFactory, null, new JVMMailboxPathLocker());
         manager.init();
@@ -84,7 +83,7 @@ public class CassandraMailboxManagerTest extends AbstractMailboxManagerTest {
 
     private void deleteAllMailboxes() throws BadCredentialsException, MailboxException {
         MailboxSession session = getMailboxManager().createSystemSession("test", LoggerFactory.getLogger("Test"));
-        CLUSTER.clearTable(CassandraMailboxTable.TABLE_NAME);
+        CASSANDRA.clearAllTables();
         session.close();
     }
 }
diff --git a/pom.xml b/pom.xml
index 0016836..39f8262 100644
--- a/pom.xml
+++ b/pom.xml
@@ -119,7 +119,7 @@
         <junit.version>4.11</junit.version>
         <jasypt.version>1.9.0</jasypt.version>
         <guava.version>13.0</guava.version>
-        <cassandra-driver-core.version>2.0.1</cassandra-driver-core.version>
+        <cassandra-driver-core.version>2.1.5</cassandra-driver-core.version>
         <cassandra-unit.version>2.0.2.1</cassandra-unit.version>
         <assertj.version>2.0.0</assertj.version>
         <jackson-databinding.version>2.3.3</jackson-databinding.version>
-- 
2.4.0

