-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathPIG-5210.patch
126 lines (121 loc) · 5.2 KB
/
PIG-5210.patch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
Index: src/org/apache/pig/PigConfiguration.java
===================================================================
--- src/org/apache/pig/PigConfiguration.java (revision 1790033)
+++ src/org/apache/pig/PigConfiguration.java (working copy)
@@ -508,6 +508,8 @@
public static final String PIG_STORE_SCHEMA_DISAMBIGUATE_DEFAULT = "true";
+ public static final String PIG_PRINT_EXEC_PLAN = "pig.print.exec.plan";
+
// Deprecated settings of Pig 0.13
/**
Index: src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
===================================================================
--- src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (revision 1790033)
+++ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (working copy)
@@ -321,6 +321,12 @@
if(mro instanceof NativeMapReduceOper) {
return null;
}
+
+ //Print MR plan before launching if needed
+ if (conf.getBoolean(PigConfiguration.PIG_PRINT_EXEC_PLAN, false)) {
+ log.info(mro.toString());
+ }
+
Job job = getJob(plan, mro, conf, pigContext);
jobMroMap.put(job, mro);
jobCtrl.addJob(job);
Index: src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
===================================================================
--- src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java (revision 1790033)
+++ src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java (working copy)
@@ -30,6 +30,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.backend.hadoop.PigATSClient;
import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
@@ -110,6 +111,12 @@
for (Map.Entry<String, LocalResource> entry : localResources.entrySet()) {
log.info("Local resource: " + entry.getKey());
}
+
+ // Print Tez plan before launching if needed
+ if (conf.getBoolean(PigConfiguration.PIG_PRINT_EXEC_PLAN, false)) {
+ log.info(tezPlanNode.getTezOperPlan());
+ }
+
DAG tezDag = buildDAG(tezPlanNode, localResources);
tezDag.setDAGInfo(createDagInfo(TezScriptState.get().getScript()));
// set Tez caller context
Index: test/org/apache/pig/test/TestEvalPipelineLocal.java
===================================================================
--- test/org/apache/pig/test/TestEvalPipelineLocal.java (revision 1790033)
+++ test/org/apache/pig/test/TestEvalPipelineLocal.java (working copy)
@@ -17,6 +17,8 @@
*/
package org.apache.pig.test;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
@@ -25,6 +27,7 @@
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -37,6 +40,12 @@
import junit.framework.Assert;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Appender;
+import org.apache.log4j.FileAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.SimpleLayout;
+import org.apache.log4j.WriterAppender;
import org.apache.pig.ComparisonFunc;
import org.apache.pig.EvalFunc;
import org.apache.pig.FuncSpec;
@@ -58,6 +67,7 @@
import org.apache.pig.impl.util.Pair;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
+import org.apache.pig.newplan.logical.rules.ColumnPruneVisitor;
import org.apache.pig.test.utils.Identity;
import org.junit.Assume;
import org.junit.Before;
@@ -1251,7 +1261,17 @@
@Test
public void testBytesRawComparatorDesc() throws Exception{
File f1 = createFile(new String[]{"2", "1", "4", "3"});
-
+
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ Logger logger = Logger.getRootLogger();
+
+ logger.setLevel(Level.INFO);
+ SimpleLayout layout = new SimpleLayout();
+ Appender appender = new WriterAppender(layout, new PrintStream(bos));
+ logger.addAppender(appender);
+
+ // Also test PIG-5210 here in the same test
+ pigServer.getPigContext().getProperties().setProperty("pig.print.exec.plan", "true");
pigServer.registerQuery("a = load '" + Util.generateURI(f1.toString(), pigServer.getPigContext())
+ "' as (value:long);");
pigServer.registerQuery("b = foreach a generate " + TOTUPLENOINNERSCHEMA.class.getName() + "(value);");
@@ -1264,5 +1284,9 @@
Assert.assertEquals(iter.next().toString(), "(2)");
Assert.assertEquals(iter.next().toString(), "(1)");
Assert.assertFalse(iter.hasNext());
+
+ logger.removeAppender(appender);
+
+ Assert.assertTrue(bos.toString().contains("New For Each(false,false)[tuple]"));
}
}