Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-3024

TestThriftRpcClient.testMultipleThreads is not really multipleThreads

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.6.0
    • None
    • Test
    • None

    Description

      ThriftRpcClient client = (ThriftRpcClient) RpcClientFactory.getThriftInstance("172.17.0.12", 4444, 10);
      		int threadCount = 100;
      		ExecutorService submissionSvc = Executors.newFixedThreadPool(threadCount);
      		ArrayList<Future<?>> futures = new ArrayList<Future<?>>(threadCount);
      		for (int i = 0; i < threadCount; i++) {
      			futures.add(submissionSvc.submit(new Runnable() {
      				public void run() {
      					try {
      						insertAsBatch(client, 0, 9);
      					} catch (Exception e) {
      						e.printStackTrace(); // To change body of catch
      												// statement use
      						// File | Settings | File Templates.
      					}
      				}
      			}));
      		}
      		for (int i = 0; i < threadCount; i++) {
      			futures.get(i).get();
      		}
      

      Although insertAsBatch is submit to a Thread pool, but the true insert occur when futures.get.get();

      this is not a synchronized action,so the insert is a synchronized too;

      when changing to

      submissionSvc.submit(new Runnable() {
      				public void run() {
      					try {
      						insertAsBatch(client, 0, 9);
      					} catch (Exception e) {
      						e.printStackTrace(); 
      					}
      				}
      			}).get()
      

      Exception occur.
      May be a real test is like this

      package thrift;
      
      import java.nio.ByteBuffer;
      import java.util.HashMap;
      import java.util.Map;
      import java.util.concurrent.ExecutorService;
      import java.util.concurrent.Executors;
      import java.util.concurrent.atomic.AtomicInteger;
      
      import org.apache.thrift.TException;
      import org.apache.thrift.protocol.TCompactProtocol;
      import org.apache.thrift.transport.TFastFramedTransport;
      import org.apache.thrift.transport.TSocket;
      import org.apache.thrift.transport.TTransport;
      import org.apache.thrift.transport.TTransportException;
      import org.apache.tomcat.jni.Thread;
      import org.junit.internal.runners.statements.RunAfters;
      
      public class Client {
      
      	static AtomicInteger i = new AtomicInteger(0);
      
      	static ThreadLocal<ThriftSourceProtocol.Client> l = new ThreadLocal<>();
      
      	public static void main(String[] args) throws TException {
      		ExecutorService submissionSvc = Executors.newFixedThreadPool(10);
      		Map<String, String> map = new HashMap<>();
      		for (int j = 0; j < 1000; j++) {
      			submissionSvc.submit(new Runnable() {
      				@Override
      				public void run() {
      					if (l.get() == null) {
      						try {
      							l.set(create());
      						} catch (TTransportException e) {
      							e.printStackTrace();
      						}
      					}
      					try {
      						int k = i.incrementAndGet();
      						Status s = l.get().append(new ThriftFlumeEvent(map,
      								ByteBuffer.wrap(String.valueOf(java.lang.Thread.currentThread().getName()).getBytes())));
      						System.out.println(k+ "****" + java.lang.Thread.currentThread().getName());
      					} catch (Exception e) {
      						l.remove();
      						e.printStackTrace();
      					}
      				}
      
      				private thrift.ThriftSourceProtocol.Client create() throws TTransportException {
      					TSocket tt = new TSocket("172.17.0.12", 4444);
      					TTransport transport = new TFastFramedTransport(tt);
      					ThriftSourceProtocol.Client client = new ThriftSourceProtocol.Client(
      							new TCompactProtocol(transport));
      					transport.open();
      					return client;
      				}
      			});
      		}
      	}
      }
      

      we must make sure each thread has it's own connection,so use threadlocal

      Attachments

        Activity

          People

            Unassigned Unassigned
            licl lichenglin
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: