Skip to content

Commit abb1f24

Browse files
author
Philipp Richter
committed
usage of broadcastrecordwriter implemented..
1 parent 32edb00 commit abb1f24

File tree

3 files changed

+279
-254
lines changed

3 files changed

+279
-254
lines changed

nephele/nephele-server/src/main/java/eu/stratosphere/nephele/multicast/MulticastManager.java

Lines changed: 41 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@
3030
import java.util.List;
3131
import java.util.Map;
3232

33+
import javax.swing.plaf.basic.BasicTreeUI.TreeToggleAction;
34+
35+
import org.apache.commons.logging.Log;
36+
import org.apache.commons.logging.LogFactory;
37+
3338
import eu.stratosphere.nephele.configuration.GlobalConfiguration;
3439
import eu.stratosphere.nephele.execution.ExecutionState;
3540
import eu.stratosphere.nephele.executiongraph.ExecutionGraph;
@@ -39,6 +44,7 @@
3944
import eu.stratosphere.nephele.io.channels.AbstractOutputChannel;
4045
import eu.stratosphere.nephele.io.channels.ChannelID;
4146
import eu.stratosphere.nephele.jobgraph.JobID;
47+
import eu.stratosphere.nephele.jobmanager.JobManager;
4248
import eu.stratosphere.nephele.jobmanager.scheduler.AbstractScheduler;
4349
import eu.stratosphere.nephele.protocols.ChannelLookupProtocol;
4450
import eu.stratosphere.nephele.taskmanager.bytebuffered.ConnectionInfoLookupResponse;
@@ -53,6 +59,8 @@
5359

5460
public class MulticastManager implements ChannelLookupProtocol {
5561

62+
private static final Log LOG = LogFactory.getLog(JobManager.class);
63+
5664
// Indicates whether topology information is available and will be used in order to construct
5765
// the multicast overlay-tree.
5866
private final boolean topologyaware;
@@ -111,60 +119,58 @@ public MulticastManager(final AbstractScheduler scheduler) {
111119
*/
112120
public synchronized ConnectionInfoLookupResponse lookupConnectionInfo(InstanceConnectionInfo caller, JobID jobID,
113121
ChannelID sourceChannelID) {
114-
System.out.println("==RECEIVING REQUEST FROM " + caller + " == SOURCE CHANNEL: " + sourceChannelID);
122+
123+
LOG.info("Receiving multicast receiver request from " + caller + " channel ID: " + sourceChannelID);
124+
115125
// check, if the tree is already created and cached
116126
if (this.cachedTrees.containsKey(sourceChannelID)) {
117-
System.out.println("==RETURNING CACHED ENTRY TO " + caller + " ==");
118-
System.out.println(cachedTrees.get(sourceChannelID).getConnectionInfo(caller));
119-
127+
LOG.info("Replying with cached entry...");
120128
return cachedTrees.get(sourceChannelID).getConnectionInfo(caller);
121129
} else {
122130

123131
// no tree exists - we assume that this is the sending node initiating a multicast
124132

125-
// first check, if all receivers are up and ready
126133
if (!checkIfAllTargetVerticesExist(caller, jobID, sourceChannelID)) {
127-
// not all target vertices exist..
128-
System.out.println("== NOT ALL RECEIVERS FOUND==");
134+
LOG.info("Received multicast request but not all receivers found.");
129135
return ConnectionInfoLookupResponse.createReceiverNotFound();
130136
}
131137

132138
if (!checkIfAllTargetVerticesReady(caller, jobID, sourceChannelID)) {
133-
// not all target vertices are ready..
134-
System.out.println("== NOT ALL RECEIVERS READY==");
139+
LOG.info("Received multicast request but not all receivers ready.");
140+
135141
return ConnectionInfoLookupResponse.createReceiverNotReady();
136142
}
137143

138-
// receivers up and running.. create tree
144+
// receivers up and running.. extract tree nodes...
139145
LinkedList<TreeNode> treenodes = extractTreeNodes(caller, jobID, sourceChannelID, this.randomized);
140146

141-
// first check, if we want to use a hard-coded tree topology...
147+
// Do we want to use a hard-coded tree topology?
142148
if (this.usehardcodedtree) {
149+
LOG.info("Creating a hard-coded tree topology from file: " + hardcodedtreefilepath);
143150
cachedTrees.put(sourceChannelID, createHardCodedTree(treenodes));
144-
System.out.println("==RETURNING ENTRY TO " + caller + " ==");
145-
System.out.println(cachedTrees.get(sourceChannelID).getConnectionInfo(caller));
146-
System.out.println("==END ENTRY==");
147151
return cachedTrees.get(sourceChannelID).getConnectionInfo(caller);
148152
}
149153

150-
// if we want to use penalties, we now load the penalties from the harddisk
154+
// Do we want to use penalties from a penalty file?
151155
if (this.usepenalties && this.penaltyfilepath != null) {
152-
System.out.println("reading penalty file from: " + this.penaltyfilepath);
156+
LOG.info("reading penalty file from: " + this.penaltyfilepath);
153157
File f = new File(this.penaltyfilepath);
154158
readPenalitesFromFile(f, treenodes);
155159
}
156160

157161
cachedTrees.put(sourceChannelID, createDefaultTree(treenodes, this.treebranching));
158-
159-
System.out.println("==RETURNING ENTRY TO " + caller + " ==");
160-
System.out.println(cachedTrees.get(sourceChannelID).getConnectionInfo(caller));
161-
System.out.println("==END ENTRY==");
162162
return cachedTrees.get(sourceChannelID).getConnectionInfo(caller);
163163

164164
}
165165

166166
}
167167

168+
/**
169+
* Returns and removes the TreeNode which is closest to the given indicator.
170+
* @param indicator
171+
* @param nodes
172+
* @return
173+
*/
168174
private TreeNode pollClosestNode(TreeNode indicator, LinkedList<TreeNode> nodes) {
169175

170176
TreeNode closestnode = getClosestNode(indicator, nodes);
@@ -175,6 +181,14 @@ private TreeNode pollClosestNode(TreeNode indicator, LinkedList<TreeNode> nodes)
175181

176182
}
177183

184+
/**
185+
* Returns the TreeNode which is closest to the given indicator Node. Proximity is determined
186+
* either using topology-information (if given), penalty information (if given) or it returns
187+
* the first node in the list.
188+
* @param indicator
189+
* @param nodes
190+
* @return
191+
*/
178192
private TreeNode getClosestNode(TreeNode indicator, LinkedList<TreeNode> nodes) {
179193

180194
if (indicator == null || !this.topologyaware && !this.usepenalties) {
@@ -248,6 +262,12 @@ private MulticastForwardingTable createDefaultTree(LinkedList<TreeNode> nodes, i
248262

249263
}
250264

265+
/**
266+
* Reads a hard-coded tree topology from file and creates a tree according to the hard-coded
267+
* topology from the file.
268+
* @param nodes
269+
* @return
270+
*/
251271
private MulticastForwardingTable createHardCodedTree(LinkedList<TreeNode> nodes) {
252272
try {
253273
FileInputStream fstream = new FileInputStream(this.hardcodedtreefilepath);
@@ -346,15 +366,13 @@ private boolean checkIfAllTargetVerticesReady(InstanceConnectionInfo caller, Job
346366
*/
347367
private LinkedList<TreeNode> extractTreeNodes(InstanceConnectionInfo source, JobID jobID,
348368
ChannelID sourceChannelID, boolean randomize) {
349-
System.out.println("==NO CACHE ENTRY FOUND. CREATING TREE==");
369+
350370
final ExecutionGraph eg = this.scheduler.getExecutionGraphByID(jobID);
351371

352372
final AbstractOutputChannel<? extends Record> outputChannel = eg.getOutputChannelByID(sourceChannelID);
353373

354374
final OutputGate<? extends Record> broadcastgate = outputChannel.getOutputGate();
355375

356-
System.out.println("Output gate is: " + broadcastgate.toString());
357-
358376
final LinkedList<AbstractOutputChannel<? extends Record>> outputChannels = new LinkedList<AbstractOutputChannel<? extends Record>>();
359377

360378
// get all broadcast output channels
@@ -364,7 +382,6 @@ private LinkedList<TreeNode> extractTreeNodes(InstanceConnectionInfo source, Job
364382
}
365383
}
366384

367-
System.out.println("Number of output channels attached: " + outputChannels.size());
368385

369386
for (AbstractOutputChannel<? extends Record> c : broadcastgate.getOutputChannels()) {
370387
System.out.println("Out channel ID: "

0 commit comments

Comments
 (0)