Details
-
Improvement
-
Status: Open
-
Normal
-
Resolution: Unresolved
-
None
-
None
Description
When I bulkload sstable created with CQLSSTableWriter, it's sometimes very slow.
CQLSSTableWriter withBufferSizeInMB 32MB
use 2 nodes write SSTable and bulkload
1、Use CQLSSTableWriter create SSTable (60 threads)
2、When the directory over 100000 rows,bulkload the directory (20 threads)
the normal bulkload speed is about 70M/s per node,and bulkload 141G SStables per node cost 90 minutes but sometimes is very slow,the same data cost 4 hours why?
here is the code bulkload sstable
public class JmxBulkLoader { static final Logger LOGGER = LoggerFactory.getLogger(JmxBulkLoader.class); private JMXConnector connector; private StorageServiceMBean storageBean; private Timer timer = new Timer(); public JmxBulkLoader(String host, int port) throws Exception { connect(host, port); } private void connect(String host, int port) throws IOException, MalformedObjectNameException { JMXServiceURL jmxUrl = new JMXServiceURL( String.format("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi", host, port)); Map<String, Object> env = new HashMap<String, Object>(); connector = JMXConnectorFactory.connect(jmxUrl, env); MBeanServerConnection mbeanServerConn = connector.getMBeanServerConnection(); ObjectName name = new ObjectName("org.apache.cassandra.db:type=StorageService"); storageBean = JMX.newMBeanProxy(mbeanServerConn, name, StorageServiceMBean.class); } public void close() throws IOException { connector.close(); } public void bulkLoad(String path) { LOGGER.info("begin load data to cassandra " + new Path(path).getName()); timer.start(); storageBean.bulkLoad(path); timer.end(); LOGGER.info("bulk load took " + timer.getTimeTakenMillis() + "ms, path: " + new Path(path).getName()); } }
bulkload thread
public class BulkThread implements Runnable { private String path; private String jmxHost; private int jmxPort; public BulkThread(String path, String jmxHost, int jmxPort) { super(); this.path = path; this.jmxHost = jmxHost; this.jmxPort = jmxPort; } @Override public void run() { JmxBulkLoader bulkLoader = null; try { bulkLoader = new JmxBulkLoader(jmxHost, jmxPort); bulkLoader.bulkLoad(path); } catch (Exception e) { e.printStackTrace(); } finally { if (bulkLoader != null) try { bulkLoader.close(); bulkLoader = null; } catch (IOException e) { e.printStackTrace(); } } } }