Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
-
None
-
None
Description
The following job will be run in a single thread for one partition.
But the two initializer – one for each input source operator should be run in two parallel threads.
@Test
public void testScanUnion() throws Exception {
JobSpecification spec = new JobSpecification();
IFileSplitProvider splitProvider1 = new ConstantFileSplitProvider(new FileSplit[]
{ new FileSplit(NC1_ID, new FileReference(new File("data/words.txt"))) });
IFileSplitProvider splitProvider2 = new ConstantFileSplitProvider(new FileSplit[]
{ new FileSplit(NC2_ID, new FileReference(new File("data/words.txt"))) });
RecordDescriptor desc = new RecordDescriptor(
new ISerializerDeserializer[]
);
FileScanOperatorDescriptor csvScanner1 = new FileScanOperatorDescriptor(
spec,
splitProvider1,
new DelimitedDataTupleParserFactory(new IValueParserFactory[]
','),
desc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner1, NC1_ID);
FileScanOperatorDescriptor csvScanner2 = new FileScanOperatorDescriptor(
spec,
splitProvider2,
new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }
,
','),
desc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner2, NC2_ID);
UnionAllOperatorDescriptor union = new UnionAllOperatorDescriptor(spec, 2, desc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, union, NC1_ID);
IConnectorDescriptor connScan1ToUnion = new OneToOneConnectorDescriptor(spec);
IConnectorDescriptor connScan2ToUnion = new OneToOneConnectorDescriptor(spec);
spec.connect(connScan1ToUnion, csvScanner1, 0, union, 0);
spec.connect(connScan2ToUnion, csvScanner2, 0, union, 1);
IOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor conn = new OneToOneConnectorDescriptor(spec);
spec.connect(conn, union, 0, printer, 0);
spec.addRoot(printer);
runTest(spec);
}