diff --git data/files/kv1kv2.cogroup.txt data/files/kv1kv2.cogroup.txt index 6d36e22..8a98be3 100644 --- data/files/kv1kv2.cogroup.txt +++ data/files/kv1kv2.cogroup.txt @@ -1,1000 +1,1000 @@ -0val_0 -0val_0 -0val_0 -10 -10 -11 -0val_10 -110 -0val_100 -0val_100 -1100 -1100 -1101 -1102 -0val_103 -0val_103 -0val_104 -0val_104 -1104 -1104 -1104 -0val_105 -1105 -1105 -1106 -0val_11 -111 -111 -111 -1110 -0val_111 -0val_113 -0val_113 -0val_114 -1114 -1114 -1114 -0val_116 -1116 -1117 -1117 -0val_118 -0val_118 -1118 -1118 -1118 -0val_119 -0val_119 -0val_119 -1119 -1119 -1119 -0val_12 -0val_12 -112 -0val_120 -0val_120 -1120 -1121 -1121 -1122 -1122 -1122 -1123 -1123 -0val_125 -0val_125 -1125 -0val_126 -1126 -1126 -0val_128 -0val_128 -0val_128 -1128 -1128 -0val_129 -0val_129 -1129 -1129 -0val_131 -1132 -1132 -0val_133 -1133 -0val_134 -0val_134 -1134 -1135 -1135 -1135 -0val_136 -1136 -0val_137 -0val_137 -1137 -0val_138 -0val_138 -0val_138 -0val_138 -1138 -1138 -1140 -0val_143 -1143 -1144 -0val_145 -0val_146 -0val_146 -1147 -1147 -0val_149 -0val_149 -1149 -0val_15 -0val_15 -115 -115 -0val_150 -1151 -1151 -0val_152 -0val_152 -1152 -1152 -1152 -0val_153 -1153 -1153 -0val_155 -0val_156 -1156 -1156 -0val_157 -1157 -1157 -0val_158 -116 -116 -0val_160 -1160 -1161 -1161 -1161 -1161 -0val_162 -1162 -0val_163 -0val_164 -0val_164 -1164 -1164 -0val_165 -0val_165 -1165 -0val_166 -0val_167 -0val_167 -0val_167 -1167 -0val_168 -1168 -0val_169 -0val_169 -0val_169 -0val_169 -0val_17 -0val_170 -1170 -0val_172 -0val_172 -1172 -0val_174 -0val_174 -1174 -1174 -0val_175 -0val_175 -1175 -1175 -0val_176 -0val_176 -0val_177 -1177 -1177 -0val_178 -1178 -1178 -0val_179 -0val_179 -1179 -0val_18 -0val_18 -0val_180 -0val_181 -1182 -0val_183 -1183 -1184 -1185 -0val_186 -0val_187 -0val_187 -0val_187 -0val_189 -1189 -0val_19 -119 -0val_190 -0val_191 -0val_191 -1191 -0val_192 -1192 -0val_193 -0val_193 -0val_193 -0val_194 -0val_195 -0val_195 -0val_196 -1196 -1196 -1196 -0val_197 -0val_197 -1197 -0val_199 -0val_199 -0val_199 -1199 -0val_2 -12 -0val_20 -120 -120 -0val_200 -0val_200 -0val_201 -0val_202 -0val_203 -0val_203 -1204 -0val_205 -0val_205 -1205 -1206 -1206 -1206 -0val_207 -0val_207 -0val_208 -0val_208 -0val_208 -0val_209 -0val_209 -1209 -1209 -121 -121 -121 -121 -1212 -0val_213 -0val_213 -1213 -0val_214 -1215 -0val_216 -0val_216 -1216 -0val_217 -0val_217 -1217 -1217 -0val_218 -0val_219 -0val_219 -122 -0val_221 -0val_221 -0val_222 -1222 -0val_223 -0val_223 -0val_224 -0val_224 -1224 -0val_226 -1226 -1226 -1226 -1226 -1227 -0val_228 -1228 -0val_229 -0val_229 -123 -0val_230 -0val_230 -0val_230 -0val_230 -0val_230 -1231 -0val_233 -0val_233 -0val_235 -1235 -0val_237 -0val_237 -0val_238 -0val_238 -1238 -0val_239 -0val_239 -1239 -1239 -0val_24 -0val_24 -1240 -0val_241 -1241 -1241 -1241 -1241 -0val_242 -0val_242 -1242 -1243 -1243 -0val_244 -1244 -1244 -1244 -1245 -1245 -1246 -1246 -0val_247 -0val_248 -1248 -0val_249 -1249 -1249 -0val_252 -1252 -1254 -0val_255 -0val_255 -0val_256 -0val_256 -1256 -0val_257 -1257 -1257 -0val_258 -1258 -1259 -1259 -0val_26 -0val_26 -0val_260 -1260 -1260 -1261 -0val_262 -1262 -1262 -0val_263 -1264 -1264 -0val_265 -0val_265 -1265 -0val_266 -1267 -1268 -0val_27 -1271 -0val_272 -0val_272 -1272 -0val_273 -0val_273 -0val_273 -1273 -0val_274 -1274 -0val_275 -1275 -1275 -1276 -0val_277 -0val_277 -0val_277 -0val_277 -1277 -1277 -0val_278 -0val_278 -1278 -0val_28 -0val_280 -0val_280 -0val_281 -0val_281 -1281 -1281 -1281 -0val_282 -0val_282 -0val_283 -0val_284 -1284 -1284 -0val_285 -1285 -0val_286 -1286 -0val_287 -1287 -1287 -0val_288 -0val_288 -0val_289 -1289 -129 -129 -0val_291 -1291 -1291 -0val_292 -1292 -1292 -1293 -1293 -1295 -1295 -0val_296 -1296 -0val_298 -0val_298 -0val_298 -13 -0val_30 -130 -1300 -1300 -0val_302 -1302 -1303 -1303 -1304 -0val_305 -1305 -0val_306 -1306 -0val_307 -0val_307 -0val_308 -1308 -1308 -0val_309 -0val_309 -1309 -131 -0val_310 -1310 -1310 -1310 -0val_311 -0val_311 -0val_311 -1313 -1314 -0val_315 -0val_316 -0val_316 -0val_316 -0val_317 -0val_317 -1317 -0val_318 -0val_318 -0val_318 -1318 -132 -0val_321 -0val_321 -0val_322 -0val_322 -1322 -0val_323 -1323 -1324 -0val_325 -0val_325 -1326 -0val_327 -0val_327 -0val_327 -1328 -1328 -0val_33 -133 -1330 -0val_331 -0val_331 -1331 -1331 -0val_332 -0val_333 -0val_333 -1333 -1334 -0val_335 -1335 -1335 -0val_336 -1336 -1337 -0val_338 -1338 -0val_339 -0val_34 -1340 -0val_341 -1341 -1341 -1341 -0val_342 -0val_342 -1342 -1343 -0val_344 -0val_344 -1344 -0val_345 -1347 -1347 -0val_348 -0val_348 -0val_348 -0val_348 -0val_348 -1348 -1349 -1349 -1349 -1349 -0val_35 -0val_35 -0val_35 -135 -135 -135 -0val_351 -1351 -1351 -1352 -1352 -0val_353 -0val_353 -1353 -1355 -1355 -0val_356 -1356 -1356 -1358 -0val_360 -1360 -0val_362 -1363 -1363 -1363 -0val_364 -1364 -0val_365 -0val_366 -0val_367 -0val_367 -1367 -1367 -0val_368 -0val_369 -0val_369 -0val_369 -1369 -0val_37 -0val_37 -1371 -1371 -1371 -1371 -0val_373 -1373 -0val_374 -1374 -0val_375 -1375 -1375 -1375 -1375 -1375 -1376 -0val_377 -0val_378 -1378 -0val_379 -1379 -1381 -0val_382 -0val_382 -1382 -1382 -0val_384 -0val_384 -0val_384 -1384 -1384 -1384 -1385 -1385 -0val_386 -1386 -1386 -1388 -0val_389 -1389 -1389 -1390 -1390 -1390 -1391 -1391 -0val_392 -1392 -1392 -0val_393 -1393 -1393 -0val_394 -0val_395 -0val_395 -1395 -1395 -0val_396 -0val_396 -0val_396 -0val_397 -0val_397 -1398 -0val_399 -0val_399 -1399 -1399 -0val_4 -14 -140 -140 -0val_400 -0val_401 -0val_401 -0val_401 -0val_401 -0val_401 -1401 -0val_402 -1402 -1402 -1402 -0val_403 -0val_403 -0val_403 -0val_404 -0val_404 -1404 -1404 -1404 -1405 -0val_406 -0val_406 -0val_406 -0val_406 -1406 -0val_407 -1407 -1407 -1407 -1408 -1408 -0val_409 -0val_409 -0val_409 -1409 -1409 -0val_41 -1410 -0val_411 -1411 -1412 -1412 -0val_413 -0val_413 -1413 -0val_414 -0val_414 -1414 -1415 -1416 -0val_417 -0val_417 -0val_417 -0val_418 -0val_419 -0val_42 -0val_42 -142 -142 -142 -0val_421 -1421 -1421 -1423 -0val_424 -0val_424 -1424 -1425 -1426 -0val_427 -1427 -1427 -1428 -0val_429 -0val_429 -1429 -1429 -0val_43 -0val_430 -0val_430 -0val_430 -1430 -0val_431 -0val_431 -0val_431 -1431 -0val_432 -1432 -0val_435 -1435 -0val_436 -1436 -0val_437 -1437 -0val_438 -0val_438 -0val_438 -1438 -1438 -0val_439 -0val_439 -1439 -1439 -0val_44 -1440 -1440 -1441 -1442 -0val_443 -1443 -1443 -1443 -0val_444 -0val_446 -1446 -1446 -1447 -0val_448 -1448 -0val_449 -1450 -1450 -1451 -0val_452 -0val_453 -1453 -0val_454 -0val_454 -0val_454 -1454 -1454 -0val_455 -1455 -1455 -0val_457 -1457 -1457 -0val_458 -0val_458 -0val_459 -0val_459 -1459 -146 -0val_460 -1461 -0val_462 -0val_462 -1462 -0val_463 -0val_463 -1463 -0val_466 -0val_466 -0val_466 -0val_467 -1467 -0val_468 -0val_468 -0val_468 -0val_468 -1468 -1468 -1468 -0val_469 -0val_469 -0val_469 -0val_469 -0val_469 -1469 -0val_47 -147 -0val_470 -1470 -0val_472 -1473 -1474 -1474 -0val_475 -1475 -1476 -1476 -0val_477 -1477 -0val_478 -0val_478 -1478 -1478 -0val_479 -148 -148 -0val_480 -0val_480 -0val_480 -1480 -1480 -0val_481 -1481 -0val_482 -1482 -0val_483 -0val_484 -1484 -0val_485 -1485 -1485 -1486 -0val_487 -1487 -1488 -0val_489 -0val_489 -0val_489 -0val_489 -1489 -149 -149 -0val_490 -1490 -0val_491 -1491 -1491 -0val_492 -0val_492 -1492 -1492 -0val_493 -0val_494 -1494 -1494 -0val_495 -1495 -0val_496 -1496 -0val_497 -1497 -1497 -0val_498 -0val_498 -0val_498 -0val_5 -0val_5 -0val_5 -15 -150 -0val_51 -0val_51 -151 -152 -152 -152 -152 -0val_53 -153 -0val_54 -156 -0val_57 -0val_58 -0val_58 -158 -158 -159 -16 -16 -160 -161 -162 -162 -163 -0val_64 -0val_65 -165 -165 -0val_66 -0val_67 -0val_67 -168 -0val_69 -169 -0val_70 -0val_70 -0val_70 -170 -171 -0val_72 -0val_72 -0val_74 -175 -0val_76 -0val_76 -176 -176 -176 -0val_77 -177 -177 -0val_78 -178 -0val_8 -18 -0val_80 -180 -0val_82 -182 -182 -0val_83 -0val_83 -0val_84 -0val_84 -0val_85 -185 -0val_86 -186 -0val_87 -187 -187 -189 -189 -189 -0val_9 -0val_90 -0val_90 -0val_90 -191 -0val_92 -193 -193 -193 -194 -0val_95 -0val_95 -0val_96 -0val_97 -0val_97 -197 -197 -0val_98 -0val_98 -199 +val_0 +val_0 +val_0 +0 +0 +1 +val_10 +10 +val_100 +val_100 +100 +100 +101 +102 +val_103 +val_103 +val_104 +val_104 +104 +104 +104 +val_105 +105 +105 +106 +val_11 +11 +11 +11 +110 +val_111 +val_113 +val_113 +val_114 +114 +114 +114 +val_116 +116 +117 +117 +val_118 +val_118 +118 +118 +118 +val_119 +val_119 +val_119 +119 +119 +119 +val_12 +val_12 +12 +val_120 +val_120 +120 +121 +121 +122 +122 +122 +123 +123 +val_125 +val_125 +125 +val_126 +126 +126 +val_128 +val_128 +val_128 +128 +128 +val_129 +val_129 +129 +129 +val_131 +132 +132 +val_133 +133 +val_134 +val_134 +134 +135 +135 +135 +val_136 +136 +val_137 +val_137 +137 +val_138 +val_138 +val_138 +val_138 +138 +138 +140 +val_143 +143 +144 +val_145 +val_146 +val_146 +147 +147 +val_149 +val_149 +149 +val_15 +val_15 +15 +15 +val_150 +151 +151 +val_152 +val_152 +152 +152 +152 +val_153 +153 +153 +val_155 +val_156 +156 +156 +val_157 +157 +157 +val_158 +16 +16 +val_160 +160 +161 +161 +161 +161 +val_162 +162 +val_163 +val_164 +val_164 +164 +164 +val_165 +val_165 +165 +val_166 +val_167 +val_167 +val_167 +167 +val_168 +168 +val_169 +val_169 +val_169 +val_169 +val_17 +val_170 +170 +val_172 +val_172 +172 +val_174 +val_174 +174 +174 +val_175 +val_175 +175 +175 +val_176 +val_176 +val_177 +177 +177 +val_178 +178 +178 +val_179 +val_179 +179 +val_18 +val_18 +val_180 +val_181 +182 +val_183 +183 +184 +185 +val_186 +val_187 +val_187 +val_187 +val_189 +189 +val_19 +19 +val_190 +val_191 +val_191 +191 +val_192 +192 +val_193 +val_193 +val_193 +val_194 +val_195 +val_195 +val_196 +196 +196 +196 +val_197 +val_197 +197 +val_199 +val_199 +val_199 +199 +val_2 +2 +val_20 +20 +20 +val_200 +val_200 +val_201 +val_202 +val_203 +val_203 +204 +val_205 +val_205 +205 +206 +206 +206 +val_207 +val_207 +val_208 +val_208 +val_208 +val_209 +val_209 +209 +209 +21 +21 +21 +21 +212 +val_213 +val_213 +213 +val_214 +215 +val_216 +val_216 +216 +val_217 +val_217 +217 +217 +val_218 +val_219 +val_219 +22 +val_221 +val_221 +val_222 +222 +val_223 +val_223 +val_224 +val_224 +224 +val_226 +226 +226 +226 +226 +227 +val_228 +228 +val_229 +val_229 +23 +val_230 +val_230 +val_230 +val_230 +val_230 +231 +val_233 +val_233 +val_235 +235 +val_237 +val_237 +val_238 +val_238 +238 +val_239 +val_239 +239 +239 +val_24 +val_24 +240 +val_241 +241 +241 +241 +241 +val_242 +val_242 +242 +243 +243 +val_244 +244 +244 +244 +245 +245 +246 +246 +val_247 +val_248 +248 +val_249 +249 +249 +val_252 +252 +254 +val_255 +val_255 +val_256 +val_256 +256 +val_257 +257 +257 +val_258 +258 +259 +259 +val_26 +val_26 +val_260 +260 +260 +261 +val_262 +262 +262 +val_263 +264 +264 +val_265 +val_265 +265 +val_266 +267 +268 +val_27 +271 +val_272 +val_272 +272 +val_273 +val_273 +val_273 +273 +val_274 +274 +val_275 +275 +275 +276 +val_277 +val_277 +val_277 +val_277 +277 +277 +val_278 +val_278 +278 +val_28 +val_280 +val_280 +val_281 +val_281 +281 +281 +281 +val_282 +val_282 +val_283 +val_284 +284 +284 +val_285 +285 +val_286 +286 +val_287 +287 +287 +val_288 +val_288 +val_289 +289 +29 +29 +val_291 +291 +291 +val_292 +292 +292 +293 +293 +295 +295 +val_296 +296 +val_298 +val_298 +val_298 +3 +val_30 +30 +300 +300 +val_302 +302 +303 +303 +304 +val_305 +305 +val_306 +306 +val_307 +val_307 +val_308 +308 +308 +val_309 +val_309 +309 +31 +val_310 +310 +310 +310 +val_311 +val_311 +val_311 +313 +314 +val_315 +val_316 +val_316 +val_316 +val_317 +val_317 +317 +val_318 +val_318 +val_318 +318 +32 +val_321 +val_321 +val_322 +val_322 +322 +val_323 +323 +324 +val_325 +val_325 +326 +val_327 +val_327 +val_327 +328 +328 +val_33 +33 +330 +val_331 +val_331 +331 +331 +val_332 +val_333 +val_333 +333 +334 +val_335 +335 +335 +val_336 +336 +337 +val_338 +338 +val_339 +val_34 +340 +val_341 +341 +341 +341 +val_342 +val_342 +342 +343 +val_344 +val_344 +344 +val_345 +347 +347 +val_348 +val_348 +val_348 +val_348 +val_348 +348 +349 +349 +349 +349 +val_35 +val_35 +val_35 +35 +35 +35 +val_351 +351 +351 +352 +352 +val_353 +val_353 +353 +355 +355 +val_356 +356 +356 +358 +val_360 +360 +val_362 +363 +363 +363 +val_364 +364 +val_365 +val_366 +val_367 +val_367 +367 +367 +val_368 +val_369 +val_369 +val_369 +369 +val_37 +val_37 +371 +371 +371 +371 +val_373 +373 +val_374 +374 +val_375 +375 +375 +375 +375 +375 +376 +val_377 +val_378 +378 +val_379 +379 +381 +val_382 +val_382 +382 +382 +val_384 +val_384 +val_384 +384 +384 +384 +385 +385 +val_386 +386 +386 +388 +val_389 +389 +389 +390 +390 +390 +391 +391 +val_392 +392 +392 +val_393 +393 +393 +val_394 +val_395 +val_395 +395 +395 +val_396 +val_396 +val_396 +val_397 +val_397 +398 +val_399 +val_399 +399 +399 +val_4 +4 +40 +40 +val_400 +val_401 +val_401 +val_401 +val_401 +val_401 +401 +val_402 +402 +402 +402 +val_403 +val_403 +val_403 +val_404 +val_404 +404 +404 +404 +405 +val_406 +val_406 +val_406 +val_406 +406 +val_407 +407 +407 +407 +408 +408 +val_409 +val_409 +val_409 +409 +409 +val_41 +410 +val_411 +411 +412 +412 +val_413 +val_413 +413 +val_414 +val_414 +414 +415 +416 +val_417 +val_417 +val_417 +val_418 +val_419 +val_42 +val_42 +42 +42 +42 +val_421 +421 +421 +423 +val_424 +val_424 +424 +425 +426 +val_427 +427 +427 +428 +val_429 +val_429 +429 +429 +val_43 +val_430 +val_430 +val_430 +430 +val_431 +val_431 +val_431 +431 +val_432 +432 +val_435 +435 +val_436 +436 +val_437 +437 +val_438 +val_438 +val_438 +438 +438 +val_439 +val_439 +439 +439 +val_44 +440 +440 +441 +442 +val_443 +443 +443 +443 +val_444 +val_446 +446 +446 +447 +val_448 +448 +val_449 +450 +450 +451 +val_452 +val_453 +453 +val_454 +val_454 +val_454 +454 +454 +val_455 +455 +455 +val_457 +457 +457 +val_458 +val_458 +val_459 +val_459 +459 +46 +val_460 +461 +val_462 +val_462 +462 +val_463 +val_463 +463 +val_466 +val_466 +val_466 +val_467 +467 +val_468 +val_468 +val_468 +val_468 +468 +468 +468 +val_469 +val_469 +val_469 +val_469 +val_469 +469 +val_47 +47 +val_470 +470 +val_472 +473 +474 +474 +val_475 +475 +476 +476 +val_477 +477 +val_478 +val_478 +478 +478 +val_479 +48 +48 +val_480 +val_480 +val_480 +480 +480 +val_481 +481 +val_482 +482 +val_483 +val_484 +484 +val_485 +485 +485 +486 +val_487 +487 +488 +val_489 +val_489 +val_489 +val_489 +489 +49 +49 +val_490 +490 +val_491 +491 +491 +val_492 +val_492 +492 +492 +val_493 +val_494 +494 +494 +val_495 +495 +val_496 +496 +val_497 +497 +497 +val_498 +val_498 +val_498 +val_5 +val_5 +val_5 +5 +50 +val_51 +val_51 +51 +52 +52 +52 +52 +val_53 +53 +val_54 +56 +val_57 +val_58 +val_58 +58 +58 +59 +6 +6 +60 +61 +62 +62 +63 +val_64 +val_65 +65 +65 +val_66 +val_67 +val_67 +68 +val_69 +69 +val_70 +val_70 +val_70 +70 +71 +val_72 +val_72 +val_74 +75 +val_76 +val_76 +76 +76 +76 +val_77 +77 +77 +val_78 +78 +val_8 +8 +val_80 +80 +val_82 +82 +82 +val_83 +val_83 +val_84 +val_84 +val_85 +85 +val_86 +86 +val_87 +87 +87 +89 +89 +89 +val_9 +val_90 +val_90 +val_90 +91 +val_92 +93 +93 +93 +94 +val_95 +val_95 +val_96 +val_97 +val_97 +97 +97 +val_98 +val_98 +99 diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java index 9898495..3f7dc63 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java @@ -35,10 +35,8 @@ import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDe; -import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.util.ReflectionUtils; /** @@ -53,98 +51,29 @@ private static final long serialVersionUID = 1L; protected static final Log LOG = LogFactory.getLog(DemuxOperator.class.getName()); - /** - * Handler is used to assign original tag (oldTag) to a row and - * track how many rows are forwarded to every child of DemuxOperator. - */ - protected static class Handler { - // oldTag is the tag assigned to ReduceSinkOperators BEFORE Correlation Optimizer - // optimizes the operator tree. newTag is the tag assigned to ReduceSinkOperators - // AFTER Correlation Optimizer optimizes the operator tree. - // Example: we have an operator tree shown below ... - // JOIN2 - // / \ - // GBY1 JOIN1 - // | / \ - // RS1 RS2 RS3 - // If GBY1, JOIN1, and JOIN2 are executed in the same Reducer - // (optimized by Correlation Optimizer), we will have ... - // oldTag: RS1:0, RS2:0, RS3:1 - // newTag: RS1:0, RS2:1, RS3:2 - // We need to know the mapping from the newTag to oldTag and revert - // the newTag to oldTag to make operators in the operator tree - // function correctly. - private final byte newTag; - private final byte oldTag; - private final byte childIndex; - private final ByteWritable oldTagByteWritable; - private final List forwardedRow; - - // counters for debugging - private transient long cntr = 0; - private transient long nextCntr = 1; - - private long getNextCntr(long cntr) { - // A very simple counter to keep track of number of rows processed by an - // operator. It dumps - // every 1 million times, and quickly before that - if (cntr >= 1000000) { - return cntr + 1000000; - } - return 10 * cntr; - } - - public long getCntr() { - return this.cntr; - } - - private final Log log; - private final boolean isLogInfoEnabled; - private final String id; - - public Handler(byte newTag, byte childIndex, byte oldTag, Log LOG, String id) - throws HiveException { - this.newTag = newTag; - this.oldTag = oldTag; - this.childIndex = childIndex; - this.oldTagByteWritable = new ByteWritable(oldTag); - this.log = LOG; - this.isLogInfoEnabled = LOG.isInfoEnabled(); - this.id = id; - this.forwardedRow = new ArrayList(3); - } - - public byte getOldTag() { - return oldTag; - } - - public Object process(Object row) throws HiveException { - forwardedRow.clear(); - List thisRow = (List) row; - forwardedRow.add(thisRow.get(0)); - forwardedRow.add(thisRow.get(1)); - forwardedRow.add(oldTagByteWritable); - - if (isLogInfoEnabled) { - cntr++; - if (cntr == nextCntr) { - log.info(id + " (newTag, childIndex, oldTag)=(" + newTag + ", " + childIndex + ", " - + oldTag + "), forwarding " + cntr + " rows"); - nextCntr = getNextCntr(cntr); - } - } - - return forwardedRow; - } - - public void printCloseOpLog() { - log.info(id + " (newTag, childIndex, oldTag)=(" + newTag + ", " + childIndex + ", " - + oldTag + "), forwarded " + cntr + " rows"); - } - } - - // The mapping from a newTag to its corresponding oldTag. Please see comments in - // DemuxOperator.Handler for explanations of newTag and oldTag. + // Counters for debugging, we cannot use existing counters (cntr and nextCntr) + // in Operator since we want to individually track the number of rows from + // different paths. + private transient long[] cntrs; + private transient long[] nextCntrs; + + // The mapping from a newTag to its corresponding oldTag. + // oldTag is the tag assigned to ReduceSinkOperators BEFORE Correlation Optimizer + // optimizes the operator tree. newTag is the tag assigned to ReduceSinkOperators + // AFTER Correlation Optimizer optimizes the operator tree. + // Example: we have an operator tree shown below ... + // JOIN2 + // / \ + // GBY1 JOIN1 + // | / \ + // RS1 RS2 RS3 + // If GBY1, JOIN1, and JOIN2 are executed in the same Reducer + // (optimized by Correlation Optimizer), we will have ... + // oldTag: RS1:0, RS2:0, RS3:1 + // newTag: RS1:0, RS2:1, RS3:2 + // We need to know the mapping from the newTag to oldTag and revert + // the newTag to oldTag to make operators in the operator tree + // function correctly. private Map newTagToOldTag = new HashMap(); @@ -153,10 +82,6 @@ public void printCloseOpLog() { private Map newTagToChildIndex = new HashMap(); - // The mapping from a newTag to its corresponding handler - private Map newTagToDispatchHandler = - new HashMap(); - // The mapping from the index of a child operator to its corresponding // inputObjectInspectors private Map childInputObjInspectors; @@ -183,24 +108,18 @@ public void printCloseOpLog() { @Override protected void initializeOp(Configuration hconf) throws HiveException { - this.newTagToOldTag = conf.getNewTagToOldTag(); - this.newTagToChildIndex = conf.getNewTagToChildIndex(); - this.newTagToDispatchHandler = new HashMap(); - this.childInputObjInspectors = new HashMap(); - - // For every newTag (every newTag corresponds to a ReduceSinkOperator), - // create a handler. Also, we initialize childInputObjInspectors at here. - for (Entry entry: newTagToOldTag.entrySet()) { - int newTag = entry.getKey(); - int oldTag = entry.getValue(); - int childIndex = newTagToChildIndex.get(newTag); - Handler handler = - new Handler((byte)newTag, (byte)childIndex, (byte)oldTag, LOG, id); - newTagToDispatchHandler.put(newTag, handler); - int childParentsCount = conf.getChildIndexToOriginalNumParents().get(childIndex); - childInputObjInspectors.put(childIndex, new ObjectInspector[childParentsCount]); + // A DemuxOperator should have at least one child + if (childOperatorsArray.length == 0) { + throw new HiveException( + "Expected number of children is at least 1. Found : " + childOperatorsArray.length); } + newTagToOldTag = conf.getNewTagToOldTag(); + newTagToChildIndex = conf.getNewTagToChildIndex(); + childInputObjInspectors = new HashMap(); + cntrs = new long[newTagToOldTag.size()]; + nextCntrs = new long[newTagToOldTag.size()]; + try { // We populate inputInspectors for all children of this DemuxOperator. // Those inputObjectInspectors are stored in childInputObjInspectors. @@ -208,6 +127,8 @@ protected void initializeOp(Configuration hconf) throws HiveException { int newTag = e1.getKey(); int oldTag = e1.getValue(); int childIndex = newTagToChildIndex.get(newTag); + cntrs[newTag] = 0; + nextCntrs[newTag] = 0; TableDesc keyTableDesc = conf.getKeysSerializeInfos().get(newTag); Deserializer inputKeyDeserializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc .getDeserializerClass(), null); @@ -221,10 +142,15 @@ protected void initializeOp(Configuration hconf) throws HiveException { List oi = new ArrayList(); oi.add(inputKeyDeserializer.getObjectInspector()); oi.add(inputValueDeserializer.getObjectInspector()); - oi.add(PrimitiveObjectInspectorFactory.writableByteObjectInspector); + int childParentsCount = conf.getChildIndexToOriginalNumParents().get(childIndex); + // Multiple newTags can point to the same child (e.g. when the child is a JoinOperator). + // So, we first check if childInputObjInspectors contains the key of childIndex. + if (!childInputObjInspectors.containsKey(childIndex)) { + childInputObjInspectors.put(childIndex, new ObjectInspector[childParentsCount]); + } ObjectInspector[] ois = childInputObjInspectors.get(childIndex); ois[oldTag] = ObjectInspectorFactory - .getStandardStructObjectInspector(Utilities.fieldNameList, oi); + .getStandardStructObjectInspector(Utilities.reduceFieldNameList, oi); } } catch (Exception e) { throw new RuntimeException(e); @@ -257,9 +183,6 @@ protected void initializeOp(Configuration hconf) throws HiveException { protected void initializeChildren(Configuration hconf) throws HiveException { state = State.INIT; LOG.info("Operator " + id + " " + getName() + " initialized"); - if (childOperators == null) { - return; - } LOG.info("Initializing children of " + id + " " + getName()); for (int i = 0; i < childOperatorsArray.length; i++) { LOG.info("Initializing child " + i + " " + childOperatorsArray[i].getIdentifier() + " " + @@ -304,52 +227,46 @@ protected void initializeChildren(Configuration hconf) throws HiveException { @Override public void processOp(Object row, int tag) throws HiveException { - int newTag = tag; - forward(row, inputObjInspectors[newTag]); - } - - @Override - public void forward(Object row, ObjectInspector rowInspector) - throws HiveException { - if ((++outputRows % 1000) == 0) { - if (counterNameToEnum != null) { - incrCounter(numOutputRowsCntr, outputRows); - outputRows = 0; + int childIndex = newTagToChildIndex.get(tag); + int oldTag = newTagToOldTag.get(tag); + if (isLogInfoEnabled) { + cntrs[tag]++; + if (cntrs[tag] == nextCntrs[tag]) { + LOG.info(id + " (newTag, childIndex, oldTag)=(" + tag + ", " + childIndex + ", " + + oldTag + "), forwarding " + cntrs[tag] + " rows"); + nextCntrs[tag] = getNextCntr(cntrs[tag]); } } - if (childOperatorsArray == null && childOperators != null) { - throw new HiveException("Internal Hive error during operator initialization."); - } - - if ((childOperatorsArray == null) || (getDone())) { - return; - } - - List thisRow = (List) row; - assert thisRow.size() == 3; - int newTag = ((ByteWritable) thisRow.get(2)).get(); - Handler handler = newTagToDispatchHandler.get(newTag); - int childIndex = newTagToChildIndex.get(newTag); - Operator o = childOperatorsArray[childIndex]; - if (o.getDone()) { + Operator child = childOperatorsArray[childIndex]; + if (child.getDone()) { childrenDone++; } else { - o.process(handler.process(row), handler.getOldTag()); + child.process(row, oldTag); } // if all children are done, this operator is also done if (childrenDone == childOperatorsArray.length) { setDone(true); } + } + @Override + public void forward(Object row, ObjectInspector rowInspector) + throws HiveException { + // DemuxOperator forwards a row to exactly one child in its children list + // based on the tag and newTagToChildIndex in processOp() method. + // So we need not to do anything in here. } @Override protected void closeOp(boolean abort) throws HiveException { - // log the number of rows forwarded from each dispatcherHandler - for (Handler handler: newTagToDispatchHandler.values()) { - handler.printCloseOpLog(); + for (Entry entry: newTagToOldTag.entrySet()) { + int newTag = entry.getKey(); + int oldTag = entry.getValue(); + int childIndex = newTagToChildIndex.get(newTag); + LOG.info(id + " (newTag, childIndex, oldTag)=(" + newTag + ", " + childIndex + ", " + + oldTag + "), forwarded " + cntrs[newTag] + " rows"); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java index d4be3d9..d5989ef 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java @@ -31,10 +31,8 @@ import org.apache.hadoop.hive.ql.plan.MuxDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; -import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; /** * MuxOperator is used in the Reduce side of MapReduce jobs optimized by Correlation Optimizer. @@ -75,12 +73,14 @@ protected static final Log LOG = LogFactory.getLog(MuxOperator.class.getName()); /** - * Handler is used to construct key-value-tag structure and assign original tag to a row. + * Handler is used to construct the key-value structure. + * This structure is needed by child JoinOperators and GroupByOperators of + * a MuxOperator to function correctly. */ protected static class Handler { private final ObjectInspector outputObjInspector; private final int tag; - private final ByteWritable tagByteWritable; + /** * The evaluators for the key columns. Key columns decide the sort order on * the reducer side. Key columns are passed to the reducer in the "key". @@ -117,7 +117,6 @@ public Handler(ObjectInspector inputObjInspector, outputValue = new Object[valueEval.length]; this.tag = tag; - this.tagByteWritable = new ByteWritable((byte)tag.intValue()); ObjectInspector keyObjectInspector = initEvaluatorsAndReturnStruct(keyEval, outputKeyColumnNames, inputObjInspector); @@ -126,10 +125,9 @@ public Handler(ObjectInspector inputObjInspector, List ois = new ArrayList(); ois.add(keyObjectInspector); ois.add(valueObjectInspector); - ois.add(PrimitiveObjectInspectorFactory.writableByteObjectInspector); this.outputObjInspector = ObjectInspectorFactory.getStandardStructObjectInspector( - Utilities.fieldNameList, ois); - this.forwardedRow = new ArrayList(3); + Utilities.reduceFieldNameList, ois); + this.forwardedRow = new ArrayList(Utilities.reduceFieldNameList.size()); } public ObjectInspector getOutputObjInspector() { @@ -155,7 +153,6 @@ public Object process(Object row) throws HiveException { // to a list. forwardedRow.add(Arrays.asList(outputKey)); forwardedRow.add(Arrays.asList(outputValue)); - forwardedRow.add(tagByteWritable); return forwardedRow; } } @@ -166,23 +163,14 @@ public Object process(Object row) throws HiveException { private transient boolean[] processGroupCalled; private Handler[] handlers; - //counters for debugging - private transient long[] cntr; - private transient long[] nextCntr; - - private long getNextCntr(long cntr) { - // A very simple counter to keep track of number of rows processed by an - // operator. It dumps - // every 1 million times, and quickly before that - if (cntr >= 1000000) { - return cntr + 1000000; - } - return 10 * cntr; - } + // Counters for debugging, we cannot use existing counters (cntr and nextCntr) + // in Operator since we want to individually track the number of rows from different inputs. + private transient long[] cntrs; + private transient long[] nextCntrs; @Override protected void initializeOp(Configuration hconf) throws HiveException { - // A MuxOperator should only has a single child + // A MuxOperator should only have a single child if (childOperatorsArray.length != 1) { throw new HiveException( "Expected number of children is 1. Found : " + childOperatorsArray.length); @@ -192,8 +180,8 @@ protected void initializeOp(Configuration hconf) throws HiveException { processGroupCalled = new boolean[numParents]; outputObjectInspectors = new ObjectInspector[numParents]; handlers = new Handler[numParents]; - cntr = new long[numParents]; - nextCntr = new long[numParents]; + cntrs = new long[numParents]; + nextCntrs = new long[numParents]; for (int i = 0; i < numParents; i++) { processGroupCalled[i] = false; if (conf.getParentToKeyCols().get(i) == null) { @@ -213,8 +201,8 @@ protected void initializeOp(Configuration hconf) throws HiveException { forward[i] = false; outputObjectInspectors[i] = handlers[i].getOutputObjInspector(); } - cntr[i] = 0; - nextCntr[i] = 1; + cntrs[i] = 0; + nextCntrs[i] = 1; } initializeChildren(hconf); } @@ -239,46 +227,31 @@ protected void initializeChildren(Configuration hconf) throws HiveException { @Override public void processOp(Object row, int tag) throws HiveException { - forward(row, tag); - } - - protected void forward(Object row, int tag) - throws HiveException { - - if (childOperatorsArray == null && childOperators != null) { - throw new HiveException( - "Internal Hive error during operator initialization."); - } - - if ((childOperatorsArray == null) || (getDone())) { - return; + if (isLogInfoEnabled) { + cntrs[tag]++; + if (cntrs[tag] == nextCntrs[tag]) { + LOG.info(id + ", tag=" + tag + ", forwarding " + cntrs[tag] + " rows"); + nextCntrs[tag] = getNextCntr(cntrs[tag]); + } } int childrenDone = 0; for (int i = 0; i < childOperatorsArray.length; i++) { - Operator o = childOperatorsArray[i]; - if (o.getDone()) { + Operator child = childOperatorsArray[i]; + if (child.getDone()) { childrenDone++; } else { if (forward[tag]) { // No need to evaluate, just forward it. - o.process(row, tag); + child.process(row, tag); } else { // Call the corresponding handler to evaluate this row and // forward the result - o.process(handlers[tag].process(row), handlers[tag].getTag()); + child.process(handlers[tag].process(row), handlers[tag].getTag()); } } } - if (isLogInfoEnabled) { - cntr[tag]++; - if (cntr[tag] == nextCntr[tag]) { - LOG.info(id + ", tag=" + tag + ", forwarding " + cntr[tag] + " rows"); - nextCntr[tag] = getNextCntr(cntr[tag]); - } - } - // if all children are done, this operator is also done if (childrenDone == childOperatorsArray.length) { setDone(true); @@ -286,6 +259,16 @@ protected void forward(Object row, int tag) } @Override + public void forward(Object row, ObjectInspector rowInspector) + throws HiveException { + // Because we need to revert the tag of a row to its old tag and + // we cannot pass new tag to this method which is used to get + // the old tag from the mapping of newTagToOldTag, we bypass + // this method in MuxOperator and directly call process on children + // in processOp() method.. + } + + @Override public void startGroup() throws HiveException{ for (int i = 0; i < numParents; i++) { processGroupCalled[i] = false; @@ -320,7 +303,7 @@ public void processGroup(int tag) throws HiveException { @Override protected void closeOp(boolean abort) throws HiveException { for (int i = 0; i < numParents; i++) { - LOG.info(id + ", tag=" + i + ", forwarded " + cntr[i] + " rows"); + LOG.info(id + ", tag=" + i + ", forwarded " + cntrs[i] + " rows"); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index ee76917..dfa309b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -794,7 +794,7 @@ public void replaceParent(Operator parent, parentOperators.set(parentIndex, newParent); } - private long getNextCntr(long cntr) { + protected long getNextCntr(long cntr) { // A very simple counter to keep track of number of rows processed by an // operator. It dumps // every 1 million times, and quickly before that diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index cbda70b..b707d85 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -123,9 +123,9 @@ import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes; +import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.api.Adjacency; import org.apache.hadoop.hive.ql.plan.api.Graph; -import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.ql.stats.StatsPublisher; @@ -171,17 +171,16 @@ * ReduceField: * KEY: record key * VALUE: record value - * ALIAS: the tag identifying the source of a record */ public static enum ReduceField { - KEY, VALUE, ALIAS + KEY, VALUE }; - public static List fieldNameList; + public static List reduceFieldNameList; static { - fieldNameList = new ArrayList(); + reduceFieldNameList = new ArrayList(); for (ReduceField r : ReduceField.values()) { - fieldNameList.add(r.toString()); + reduceFieldNameList.add(r.toString()); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java index d12a53c..8b2653e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java @@ -29,8 +29,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.MapredContext; +import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -40,10 +40,8 @@ import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; -import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; @@ -54,10 +52,10 @@ import org.apache.hadoop.util.StringUtils; /** - * ExecReducer is the generic Reducer class for Hive. Together with ExecMapper it is + * ExecReducer is the generic Reducer class for Hive. Together with ExecMapper it is * the bridge between the map-reduce framework and the Hive operator pipeline at * execution time. It's main responsabilities are: - * + * * - Load and setup the operator pipeline from XML * - Run the pipeline by transforming key, value pairs to records and forwarding them to the operators * - Sending start and end group messages to separate records with same key from one another @@ -139,9 +137,8 @@ public void configure(JobConf job) { ArrayList ois = new ArrayList(); ois.add(keyObjectInspector); ois.add(valueObjectInspector[tag]); - ois.add(PrimitiveObjectInspectorFactory.writableByteObjectInspector); rowObjectInspector[tag] = ObjectInspectorFactory - .getStandardStructObjectInspector(Utilities.fieldNameList, ois); + .getStandardStructObjectInspector(Utilities.reduceFieldNameList, ois); } } catch (Exception e) { throw new RuntimeException(e); @@ -169,8 +166,7 @@ public void configure(JobConf job) { private BytesWritable groupKey; - List row = new ArrayList(3); - ByteWritable tag = new ByteWritable(); + List row = new ArrayList(Utilities.reduceFieldNameList.size()); public void reduce(Object key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { @@ -178,7 +174,7 @@ public void reduce(Object key, Iterator values, OutputCollector output, return; } if (oc == null) { - // propagete reporter and output collector to all operators + // propagate reporter and output collector to all operators oc = output; rp = reporter; reducer.setOutputCollector(oc); @@ -188,11 +184,12 @@ public void reduce(Object key, Iterator values, OutputCollector output, try { BytesWritable keyWritable = (BytesWritable) key; - tag.set((byte) 0); + byte tag = 0; if (isTagged) { - // remove the tag + // remove the tag from key coming out of reducer + // and store it in separate variable. int size = keyWritable.getSize() - 1; - tag.set(keyWritable.get()[size]); + tag = keyWritable.get()[size]; keyWritable.setSize(size); } @@ -226,22 +223,19 @@ public void reduce(Object key, Iterator values, OutputCollector output, BytesWritable valueWritable = (BytesWritable) values.next(); // System.err.print(who.getHo().toString()); try { - valueObject[tag.get()] = inputValueDeserializer[tag.get()] - .deserialize(valueWritable); + valueObject[tag] = inputValueDeserializer[tag].deserialize(valueWritable); } catch (SerDeException e) { throw new HiveException( "Hive Runtime Error: Unable to deserialize reduce input value (tag=" - + tag.get() + + tag + ") from " + Utilities.formatBinaryString(valueWritable.get(), 0, valueWritable.getSize()) + " with properties " - + valueTableDesc[tag.get()].getProperties(), e); + + valueTableDesc[tag].getProperties(), e); } row.clear(); row.add(keyObject); - row.add(valueObject[tag.get()]); - // The tag is not used any more, we should remove it. - row.add(tag); + row.add(valueObject[tag]); if (isLogInfoEnabled) { cntr++; if (cntr == nextCntr) { @@ -252,17 +246,17 @@ public void reduce(Object key, Iterator values, OutputCollector output, } } try { - reducer.process(row, tag.get()); + reducer.process(row, tag); } catch (Exception e) { String rowString = null; try { - rowString = SerDeUtils.getJSONString(row, rowObjectInspector[tag.get()]); + rowString = SerDeUtils.getJSONString(row, rowObjectInspector[tag]); } catch (Exception e2) { rowString = "[Error getting row data with exception " + StringUtils.stringifyException(e2) + " ]"; } throw new HiveException("Hive Runtime Error while processing row (tag=" - + tag.get() + ") " + rowString, e); + + tag + ") " + rowString, e); } } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java index 6a74ae4..48c9849 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java @@ -22,6 +22,7 @@ import java.io.FileInputStream; import java.util.ArrayList; import java.util.LinkedList; +import java.util.List; import junit.framework.TestCase; @@ -54,6 +55,7 @@ import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.log4j.Logger; /** * Mimics the actual query compiler in generating end to end plans and testing @@ -66,6 +68,7 @@ private static String tmpdir = System.getProperty("java.io.tmpdir") + File.separator + System.getProperty("user.name") + File.separator; + private static Logger LOG = Logger.getLogger(TestExecDriver.class); private static Path tmppath = new Path(tmpdir); private static Hive db; private static FileSystem fs; @@ -85,7 +88,7 @@ + tmpdir); } } - + LOG.info("Directory of actual files: " + tmppath); for (Object one : Utilities.makeList("mapplan1.out", "mapplan2.out", "mapredplan1.out", "mapredplan2.out", "mapredplan3.out", "mapredplan4.out", "mapredplan5.out", "mapredplan6.out")) { @@ -100,6 +103,7 @@ Path[] hadoopDataFile = new Path[2]; String[] testFiles = {"kv1.txt", "kv2.txt"}; String testFileDir = new Path(conf.get("test.data.files")).toUri().getPath(); + LOG.info("Directory of expected files: " + testFileDir); for (String oneFile : testFiles) { Path localDataFile = new Path(testFileDir, oneFile); hadoopDataFile[i] = new Path(tmppath, oneFile); @@ -142,7 +146,6 @@ public static void addMapWork(MapredWork mr, Table tbl, String alias, Operator outputColumns = new ArrayList(); + List outputColumns = new ArrayList(); for (int i = 0; i < 2; i++) { outputColumns.add("_col" + i); } @@ -312,12 +314,10 @@ private void populateMapRedPlan3(Table src, Table src2) throws SemanticException + "mapredplan3.out", Utilities.defaultTd, false)); Operator op5 = OperatorFactory.get(new SelectDesc(Utilities - .makeList(getStringColumn(Utilities.ReduceField.ALIAS.toString()), - new ExprNodeFieldDesc(TypeInfoFactory.stringTypeInfo, - new ExprNodeColumnDesc(TypeInfoFactory - .getListTypeInfo(TypeInfoFactory.stringTypeInfo), - Utilities.ReduceField.VALUE.toString(), "", false), "0", - false)), outputColumns), op4); + .makeList(new ExprNodeFieldDesc(TypeInfoFactory.stringTypeInfo, + new ExprNodeColumnDesc(TypeInfoFactory.getListTypeInfo(TypeInfoFactory.stringTypeInfo), + Utilities.ReduceField.VALUE.toString(), "", false), "0", false)), + Utilities.makeList(outputColumns.get(0))), op4); mr.setReducer(op5); } @@ -447,16 +447,16 @@ private void executePlan() throws Exception { int exitVal = mrtask.execute(dctx); if (exitVal != 0) { - System.out.println(testName + " execution failed with exit status: " + LOG.error(testName + " execution failed with exit status: " + exitVal); assertEquals(true, false); } - System.out.println(testName + " execution completed successfully"); + LOG.info(testName + " execution completed successfully"); } public void testMapPlan1() throws Exception { - System.out.println("Beginning testMapPlan1"); + LOG.info("Beginning testMapPlan1"); try { populateMapPlan1(db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, "src")); @@ -470,7 +470,7 @@ public void testMapPlan1() throws Exception { public void testMapPlan2() throws Exception { - System.out.println("Beginning testMapPlan2"); + LOG.info("Beginning testMapPlan2"); try { populateMapPlan2(db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, "src")); @@ -484,7 +484,7 @@ public void testMapPlan2() throws Exception { public void testMapRedPlan1() throws Exception { - System.out.println("Beginning testMapRedPlan1"); + LOG.info("Beginning testMapRedPlan1"); try { populateMapRedPlan1(db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, @@ -499,7 +499,7 @@ public void testMapRedPlan1() throws Exception { public void testMapRedPlan2() throws Exception { - System.out.println("Beginning testMapPlan2"); + LOG.info("Beginning testMapPlan2"); try { populateMapRedPlan2(db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, @@ -514,7 +514,7 @@ public void testMapRedPlan2() throws Exception { public void testMapRedPlan3() throws Exception { - System.out.println("Beginning testMapPlan3"); + LOG.info("Beginning testMapPlan3"); try { populateMapRedPlan3(db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, @@ -529,7 +529,7 @@ public void testMapRedPlan3() throws Exception { public void testMapRedPlan4() throws Exception { - System.out.println("Beginning testMapPlan4"); + LOG.info("Beginning testMapPlan4"); try { populateMapRedPlan4(db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, @@ -544,7 +544,7 @@ public void testMapRedPlan4() throws Exception { public void testMapRedPlan5() throws Exception { - System.out.println("Beginning testMapPlan5"); + LOG.info("Beginning testMapPlan5"); try { populateMapRedPlan5(db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, @@ -559,7 +559,7 @@ public void testMapRedPlan5() throws Exception { public void testMapRedPlan6() throws Exception { - System.out.println("Beginning testMapPlan6"); + LOG.info("Beginning testMapPlan6"); try { populateMapRedPlan6(db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME,