Skip to content

Commit 83dba38

Browse files
committed
Merge branch 'release/0.13.1'
2 parents 4decf8b + 1261a10 commit 83dba38

File tree

9 files changed

+26
-10
lines changed

9 files changed

+26
-10
lines changed

CHANGES.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
# Scalding #
22

3+
### Version 0.13.1 ###
4+
* Back out 4 changes to be binary compatible: https://github.com/twitter/scalding/pull/1187
5+
* Use java.util.Random instead of scala.util.Random: https://github.com/twitter/scalding/pull/1186
6+
* Add Execution.failed: https://github.com/twitter/scalding/pull/1185
7+
* Using a ConcurrentHashMap instead of a WeakHashMap to make the Stats behave in a correct manner: https://github.com/twitter/scalding/pull/1184
8+
* Add applicative for Execution: https://github.com/twitter/scalding/pull/1181
9+
310
### Version 0.13.0 ###
411
* Covert LzoTextDelimited to Cascading scheme.: https://github.com/twitter/scalding/pull/1179
512
* Make TraceUtil support versions of cascading older than 2.6: https://github.com/twitter/scalding/pull/1180

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ Scalding is a Scala library that makes it easy to specify Hadoop MapReduce jobs.
44

55
![Scalding Logo](https://raw.github.com/twitter/scalding/develop/logo/scalding.png)
66

7-
Current version: `0.13.0`
7+
Current version: `0.13.1`
88

99
## Word Count
1010

scalding-core/src/main/scala/com/twitter/package.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,11 @@ package object scalding {
3030
type KeyedList[K, +V] = com.twitter.scalding.typed.KeyedList[K, V]
3131
type ValuePipe[+T] = com.twitter.scalding.typed.ValuePipe[T]
3232
type Grouped[K, +V] = com.twitter.scalding.typed.Grouped[K, V]
33+
3334
/**
3435
* Make sure this is in sync with version.sbt
3536
*/
36-
val scaldingVersion: String = "0.13.0"
37+
val scaldingVersion: String = "0.13.1"
3738

3839
object RichPathFilter {
3940
implicit def toRichPathFilter(f: PathFilter) = new RichPathFilter(f)

scalding-core/src/main/scala/com/twitter/scalding/Execution.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ distributed under the License is distributed on an "AS IS" BASIS,
1212
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1313
See the License for the specific language governing permissions and
1414
limitations under the License.
15-
*/
15+
*/
1616
package com.twitter.scalding
1717

1818
import com.twitter.algebird.monad.Reader
@@ -193,6 +193,7 @@ object Execution {
193193
override def apply[T](t: T): Execution[T] = Execution.from(t)
194194
override def map[T, U](e: Execution[T])(fn: T => U): Execution[U] = e.map(fn)
195195
override def flatMap[T, U](e: Execution[T])(fn: T => Execution[U]): Execution[U] = e.flatMap(fn)
196+
override def join[T, U](t: Execution[T], u: Execution[U]): Execution[(T, U)] = t.zip(u)
196197
}
197198

198199
trait EvalCache { self =>
@@ -399,6 +400,13 @@ object Execution {
399400
case Success(s) => Future.successful(s)
400401
case Failure(err) => Future.failed(err)
401402
}
403+
404+
/**
405+
* This creates a definitely failed Execution.
406+
*/
407+
def failed(t: Throwable): Execution[Nothing] =
408+
fromFuture(_ => Future.failed(t))
409+
402410
/**
403411
* This makes a constant execution that runs no job.
404412
* Note this is a lazy parameter that is evaluated every

scalding-core/src/main/scala/com/twitter/scalding/JoinAlgorithms.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import cascading.operation.filter._
2727
import cascading.tuple._
2828
import cascading.cascade._
2929

30-
import scala.util.Random
30+
import java.util.Random // this one is serializable, scala.util.Random is not
3131
import scala.collection.JavaConverters._
3232

3333
object JoinAlgorithms extends java.io.Serializable {

scalding-core/src/main/scala/com/twitter/scalding/Operations.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -466,7 +466,7 @@ package com.twitter.scalding {
466466
}
467467
}
468468

469-
class SampleWithReplacement(frac: Double, val seed: Int = new scala.util.Random().nextInt) extends BaseOperation[Poisson]()
469+
class SampleWithReplacement(frac: Double, val seed: Int = new java.util.Random().nextInt) extends BaseOperation[Poisson]()
470470
with Function[Poisson] with ScaldingPrepare[Poisson] {
471471
override def prepare(flowProcess: FlowProcess[_], operationCall: OperationCall[Poisson]) {
472472
super.prepare(flowProcess, operationCall)

scalding-core/src/main/scala/com/twitter/scalding/Stats.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package com.twitter.scalding
22

33
import cascading.flow.{ FlowDef, FlowProcess }
44
import cascading.stats.CascadingStats
5-
import java.util.{ Collections, WeakHashMap }
5+
import java.util.concurrent.ConcurrentHashMap
66
import org.slf4j.{ Logger, LoggerFactory }
77
import scala.collection.JavaConversions._
88
import scala.collection.JavaConverters._
@@ -109,7 +109,7 @@ object RuntimeStats extends java.io.Serializable {
109109
@transient private lazy val logger: Logger = LoggerFactory.getLogger(this.getClass)
110110

111111
private val flowMappingStore: mutable.Map[String, WeakReference[FlowProcess[_]]] =
112-
Collections.synchronizedMap(new WeakHashMap[String, WeakReference[FlowProcess[_]]])
112+
new ConcurrentHashMap[String, WeakReference[FlowProcess[_]]]
113113

114114
def getFlowProcessForUniqueId(uniqueId: UniqueID): FlowProcess[_] = {
115115
(for {

scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import cascading.flow.FlowDef
2828
import cascading.pipe.{ Each, Pipe }
2929
import cascading.tap.Tap
3030
import cascading.tuple.{ Fields, Tuple => CTuple, TupleEntry }
31-
import util.Random
31+
import java.util.Random // prefer to scala.util.Random as this is serializable
3232

3333
import scala.concurrent.Future
3434

@@ -400,7 +400,7 @@ trait TypedPipe[+T] extends Serializable {
400400
*/
401401
def sample(percent: Double, seed: Long): TypedPipe[T] = {
402402
// Make sure to fix the seed, otherwise restarts cause subtle errors
403-
val rand = new Random(seed)
403+
lazy val rand = new Random(seed)
404404
filter(_ => rand.nextDouble < percent)
405405
}
406406

version.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version in ThisBuild := "0.13.0"
1+
version in ThisBuild := "0.13.1"

0 commit comments

Comments
 (0)