Skip to content

Commit c0ac81c

Browse files
author
sewen
committed
Extended Configuration for binary data.
Fixed de/encoding problem in TaskConfig.
1 parent 8b3c4f1 commit c0ac81c

File tree

2 files changed

+47
-4
lines changed
  • nephele/nephele-common/src/main/java/eu/stratosphere/nephele/configuration
  • pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util

2 files changed

+47
-4
lines changed

nephele/nephele-common/src/main/java/eu/stratosphere/nephele/configuration/Configuration.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.Map;
2525
import java.util.Set;
2626

27+
import org.apache.commons.codec.binary.Base64;
2728
import org.apache.commons.logging.Log;
2829
import org.apache.commons.logging.LogFactory;
2930

@@ -349,6 +350,48 @@ public void setFloat(final String key, final float value) {
349350
this.confData.put(key, Float.toString(value));
350351
}
351352
}
353+
354+
355+
/**
356+
* Returns the value associated with the given key as a byte array.
357+
*
358+
* @param key
359+
* The key pointing to the associated value.
360+
* @param defaultValue
361+
* The default value which is returned in case there is no value associated with the given key.
362+
* @return the (default) value associated with the given key.
363+
*/
364+
public byte[] getBytes(final String key, final byte[] defaultValue) {
365+
final String encoded;
366+
synchronized (this.confData) {
367+
encoded = this.confData.get(key);
368+
}
369+
if (encoded == null) {
370+
return defaultValue;
371+
}
372+
373+
return Base64.decodeBase64(encoded.getBytes());
374+
}
375+
376+
/**
377+
* Adds the given byte array to the configuration object. If key is <code>null</code> then nothing is added.
378+
*
379+
* @param key
380+
* The key under which the bytes are added.
381+
* @param bytes
382+
* The bytes to be added.
383+
*/
384+
public void setBytes(final String key, final byte[] bytes) {
385+
if (key == null) {
386+
LOG.warn("Cannot set boolean: Given key is null!");
387+
return;
388+
}
389+
390+
final String encoded = new String(Base64.encodeBase64(bytes));
391+
synchronized (this.confData) {
392+
this.confData.put(key, encoded);
393+
}
394+
}
352395

353396
/**
354397
* Returns the keys of all key/value pairs stored inside this

pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/TaskConfig.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -434,8 +434,8 @@ public void setOutputDataDistribution(DataDistribution distribution)
434434
} catch (IOException e) {
435435
throw new RuntimeException("Error serializing the DataDistribution: " + e.getMessage(), e);
436436
}
437-
final String stateEncoded = baos.toString();
438-
this.config.setString(OUTPUT_DATA_DISTRIBUTION_STATE, stateEncoded);
437+
438+
this.config.setBytes(OUTPUT_DATA_DISTRIBUTION_STATE, baos.toByteArray());
439439
}
440440

441441
public DataDistribution getOutputDataDistribution(final ClassLoader cl) throws ClassNotFoundException
@@ -455,13 +455,13 @@ public DataDistribution getOutputDataDistribution(final ClassLoader cl) throws C
455455

456456
final DataDistribution distribution = InstantiationUtil.instantiate(clazz, DataDistribution.class);
457457

458-
final String stateEncoded = this.config.getString(OUTPUT_DATA_DISTRIBUTION_STATE, null);
458+
final byte[] stateEncoded = this.config.getBytes(OUTPUT_DATA_DISTRIBUTION_STATE, null);
459459
if (stateEncoded == null) {
460460
throw new CorruptConfigurationException(
461461
"The configuration contained the data distribution type, but no serialized state.");
462462
}
463463

464-
final ByteArrayInputStream bais = new ByteArrayInputStream(stateEncoded.getBytes());
464+
final ByteArrayInputStream bais = new ByteArrayInputStream(stateEncoded);
465465
final DataInputStream in = new DataInputStream(bais);
466466

467467
try {

0 commit comments

Comments
 (0)