19
19
20
20
import java .io .File ;
21
21
import java .io .IOException ;
22
- import java .io .ObjectOutputStream ;
23
22
import java .io .OutputStream ;
24
23
import java .io .Serializable ;
25
24
import java .lang .management .ManagementFactory ;
46
45
import org .apache .hadoop .hive .ql .QueryPlan ;
47
46
import org .apache .hadoop .hive .ql .exec .BucketMatcher ;
48
47
import org .apache .hadoop .hive .ql .exec .FetchOperator ;
49
- import org .apache .hadoop .hive .ql .exec .HashTableSinkOperator ;
50
48
import org .apache .hadoop .hive .ql .exec .Operator ;
51
49
import org .apache .hadoop .hive .ql .exec .SecureCmdDoAs ;
52
50
import org .apache .hadoop .hive .ql .exec .TableScanOperator ;
53
51
import org .apache .hadoop .hive .ql .exec .Task ;
54
52
import org .apache .hadoop .hive .ql .exec .Utilities ;
55
53
import org .apache .hadoop .hive .ql .exec .Utilities .StreamPrinter ;
56
54
import org .apache .hadoop .hive .ql .exec .mapjoin .MapJoinMemoryExhaustionException ;
57
- import org .apache .hadoop .hive .ql .exec .persistence .MapJoinTableContainerSerDe ;
58
55
import org .apache .hadoop .hive .ql .io .HiveInputFormat ;
59
56
import org .apache .hadoop .hive .ql .metadata .HiveException ;
60
57
import org .apache .hadoop .hive .ql .plan .BucketMapJoinContext ;
@@ -340,6 +337,12 @@ public void startForward(String bigTableBucket) throws Exception {
340
337
341
338
private void startForward (boolean inputFileChangeSenstive , String bigTableBucket )
342
339
throws Exception {
340
+ for (Operator <?> source : work .getAliasToWork ().values ()) {
341
+ source .reset ();
342
+ }
343
+ if (inputFileChangeSenstive ) {
344
+ execContext .setCurrentBigBucketFile (bigTableBucket );
345
+ }
343
346
for (Map .Entry <String , FetchOperator > entry : fetchOperators .entrySet ()) {
344
347
String alias = entry .getKey ();
345
348
FetchOperator fetchOp = entry .getValue ();
@@ -351,13 +354,6 @@ private void startForward(boolean inputFileChangeSenstive, String bigTableBucket
351
354
352
355
// get the root operator
353
356
Operator <? extends OperatorDesc > forwardOp = work .getAliasToWork ().get (alias );
354
- if (fetchOp .isEmptyTable ()) {
355
- //generate empty hashtable for empty table
356
- this .generateDummyHashTable (alias , bigTableBucket );
357
- forwardOp .close (false );
358
- continue ;
359
- }
360
-
361
357
// walk through the operator tree
362
358
while (!forwardOp .getDone ()) {
363
359
InspectableObject row = fetchOp .getNextRow ();
@@ -366,11 +362,10 @@ private void startForward(boolean inputFileChangeSenstive, String bigTableBucket
366
362
}
367
363
forwardOp .processOp (row .o , 0 );
368
364
}
369
- if (inputFileChangeSenstive ) {
370
- execContext .setCurrentBigBucketFile (bigTableBucket );
371
- forwardOp .reset ();
372
- }
373
- forwardOp .close (false );
365
+ forwardOp .flush ();
366
+ }
367
+ for (Operator <?> source : work .getAliasToWork ().values ()) {
368
+ source .close (false );
374
369
}
375
370
}
376
371
@@ -421,43 +416,6 @@ private void initializeOperators(Map<FetchOperator, JobConf> fetchOpJobConfMap)
421
416
}
422
417
}
423
418
424
- private void generateDummyHashTable (String alias , String bigBucketFileName )
425
- throws HiveException ,IOException {
426
- LOG .debug ("generating dummy for " + alias );
427
- // find the (byte)tag for the map join(HashTableSinkOperator)
428
- Operator <? extends OperatorDesc > parentOp = work .getAliasToWork ().get (alias );
429
- Operator <? extends OperatorDesc > childOp = parentOp .getChildOperators ().get (0 );
430
- while ((childOp != null ) && (!(childOp instanceof HashTableSinkOperator ))) {
431
- parentOp = childOp ;
432
- assert parentOp .getChildOperators ().size () == 1 ;
433
- childOp = parentOp .getChildOperators ().get (0 );
434
- }
435
- if (childOp == null ) {
436
- throw new HiveException (
437
- "Cannot find HashTableSink op by tracing down the table scan operator tree" );
438
- }
439
- byte tag = (byte ) childOp .getParentOperators ().indexOf (parentOp );
440
-
441
- // generate empty hashtable for this (byte)tag
442
- Path tmpPath = this .getWork ().getTmpPath ();
443
-
444
- String fileName = work .getBucketFileName (bigBucketFileName );
445
-
446
- HashTableSinkOperator htso = (HashTableSinkOperator )childOp ;
447
- Path path = Utilities .generatePath (tmpPath , htso .getConf ().getDumpFilePrefix (),
448
- tag , fileName );
449
- console .printInfo (Utilities .now () + "\t Dump the hashtable into file: " + path );
450
- FileSystem fs = path .getFileSystem (job );
451
- ObjectOutputStream out = new ObjectOutputStream (fs .create (path ));
452
- try {
453
- MapJoinTableContainerSerDe .persistDummyTable (out );
454
- } finally {
455
- out .close ();
456
- }
457
- console .printInfo (Utilities .now () + "\t Upload 1 File to: " + path + " File size: "
458
- + fs .getFileStatus (path ).getLen ());
459
- }
460
-
461
419
private void setUpFetchOpContext (FetchOperator fetchOp , String alias , String currentInputFile )
462
420
throws Exception {
463
421
0 commit comments