Uploaded image for project: 'Cassandra'
  1. Cassandra
  2. CASSANDRA-13266

Bulk loading sometimes is very slow?

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Normal
    • Resolution: Unresolved
    • None
    • Legacy/Tools
    • None

    Description

      When I bulkload sstable created with CQLSSTableWriter, it's sometimes very slow.
      CQLSSTableWriter withBufferSizeInMB 32MB
      use 2 nodes write SSTable and bulkload
      1、Use CQLSSTableWriter create SSTable (60 threads)
      2、When the directory over 100000 rows,bulkload the directory (20 threads)
      the normal bulkload speed is about 70M/s per node,and bulkload 141G SStables per node cost 90 minutes but sometimes is very slow,the same data cost 4 hours why?
      here is the code bulkload sstable

      public class JmxBulkLoader {
      	
          static final Logger LOGGER = LoggerFactory.getLogger(JmxBulkLoader.class);
      	private JMXConnector connector;
      	private StorageServiceMBean storageBean;
      	private Timer timer = new Timer();
      
      	public JmxBulkLoader(String host, int port) throws Exception {
      		connect(host, port);
      	}
      
      
      	private void connect(String host, int port) throws IOException, MalformedObjectNameException {
      		JMXServiceURL jmxUrl = new JMXServiceURL(
      				String.format("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi", host, port));
      		Map<String, Object> env = new HashMap<String, Object>();
      		connector = JMXConnectorFactory.connect(jmxUrl, env);
      		MBeanServerConnection mbeanServerConn = connector.getMBeanServerConnection();
      		ObjectName name = new ObjectName("org.apache.cassandra.db:type=StorageService");
      		storageBean = JMX.newMBeanProxy(mbeanServerConn, name, StorageServiceMBean.class);
      	}
      
      	public void close() throws IOException {
      		connector.close();
      	}
      
      	public void bulkLoad(String path) {
      		LOGGER.info("begin load data to cassandra " + new Path(path).getName());
      		timer.start();
      		storageBean.bulkLoad(path);
      		timer.end();
      		LOGGER.info("bulk load took " + timer.getTimeTakenMillis() + "ms, path: " + new Path(path).getName());
      	}
      }
      

      bulkload thread

       
      public class BulkThread implements Runnable {
      
      	private String path;
      	private String jmxHost;
      	private int jmxPort;
      	
      	public BulkThread(String path, String jmxHost, int jmxPort) {
      		super();
      		this.path = path;
      		this.jmxHost = jmxHost;
      		this.jmxPort = jmxPort;
      	}
      	@Override
      	public void run() {
      		JmxBulkLoader bulkLoader = null;
      		try {
      			bulkLoader = new JmxBulkLoader(jmxHost, jmxPort);
      			bulkLoader.bulkLoad(path);
      		} catch (Exception e) {
      			e.printStackTrace();
      		} finally {
      			if (bulkLoader != null)
      				try {
      					bulkLoader.close();
      					bulkLoader = null;
      				} catch (IOException e) {
      					e.printStackTrace();
      				}
      		}
      	}
      }
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            bruceliang liangsibin
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: