Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.6.0
-
None
-
None
Description
ThriftRpcClient client = (ThriftRpcClient) RpcClientFactory.getThriftInstance("172.17.0.12", 4444, 10); int threadCount = 100; ExecutorService submissionSvc = Executors.newFixedThreadPool(threadCount); ArrayList<Future<?>> futures = new ArrayList<Future<?>>(threadCount); for (int i = 0; i < threadCount; i++) { futures.add(submissionSvc.submit(new Runnable() { public void run() { try { insertAsBatch(client, 0, 9); } catch (Exception e) { e.printStackTrace(); // To change body of catch // statement use // File | Settings | File Templates. } } })); } for (int i = 0; i < threadCount; i++) { futures.get(i).get(); }
Although insertAsBatch is submit to a Thread pool, but the true insert occur when futures.get.get();
this is not a synchronized action,so the insert is a synchronized too;
when changing to
submissionSvc.submit(new Runnable() { public void run() { try { insertAsBatch(client, 0, 9); } catch (Exception e) { e.printStackTrace(); } } }).get()
Exception occur.
May be a real test is like this
package thrift; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import org.apache.thrift.TException; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.transport.TFastFramedTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; import org.apache.tomcat.jni.Thread; import org.junit.internal.runners.statements.RunAfters; public class Client { static AtomicInteger i = new AtomicInteger(0); static ThreadLocal<ThriftSourceProtocol.Client> l = new ThreadLocal<>(); public static void main(String[] args) throws TException { ExecutorService submissionSvc = Executors.newFixedThreadPool(10); Map<String, String> map = new HashMap<>(); for (int j = 0; j < 1000; j++) { submissionSvc.submit(new Runnable() { @Override public void run() { if (l.get() == null) { try { l.set(create()); } catch (TTransportException e) { e.printStackTrace(); } } try { int k = i.incrementAndGet(); Status s = l.get().append(new ThriftFlumeEvent(map, ByteBuffer.wrap(String.valueOf(java.lang.Thread.currentThread().getName()).getBytes()))); System.out.println(k+ "****" + java.lang.Thread.currentThread().getName()); } catch (Exception e) { l.remove(); e.printStackTrace(); } } private thrift.ThriftSourceProtocol.Client create() throws TTransportException { TSocket tt = new TSocket("172.17.0.12", 4444); TTransport transport = new TFastFramedTransport(tt); ThriftSourceProtocol.Client client = new ThriftSourceProtocol.Client( new TCompactProtocol(transport)); transport.open(); return client; } }); } } }
we must make sure each thread has it's own connection,so use threadlocal