1.和Java 进行交互

在 Scala 程序中,java.lang包下的类是默认全部引入的,其它包下的类则需要显式(explicitly)引入,且Scala 可以和 Java 进行无缝(seemlessly)的交互操作。

1
2
3
4
5
6
7
8
9
import java.util.{Date, Locale}
import java.text.DateFormat
import java.text.DateFormat._ //scala中*是合法标识符,_代表所有
object FrenchDate {
def main(args: Array[String]) {
val now = new Date
val df = getDateInstance(LONG, Locale.FRANCE) println(df format now)
}
}

df format now是一个语法特性,如果一个方法只接受一个参数,那么可以使用 infix 语法,和 df.format(now)的语义完全相同

2.函数是对象

Scala 中,函数(functions)也是对象(objects),所以,函数可以当做参数进行传递,可以把函数存储在变量中,也可以把函数作为其他函数的返回值。这种将函数当做值进行操作的能力,是函数式编程(functional programming)最重要的特性之一。

1
2
3
4
5
6
object Timer {
def oncePerSecond(callback: () => Unit) {
while (true) { callback(); Thread sleep 1000 } }
def timeFlies() {println("time flies like an arrow...")}
def main(args: Array[String]) {oncePerSecond(timeFlies)}
}

上面的程序实现了简单定时器的功能,负责定时的函数(function)名为:oncePerSecond,它接受一个回调函数作为参数,该回调函数的类型记为:() => Unit,代表任何无参数、无返回值的函数(Unit和 C/C++中的 void类似)。程序的 main 方法调用定时函数,作为实参传进去的回调函数 timeFlies,仅仅向终端打印一句话,所以,该程序的实际功能是:每秒钟在屏幕上打印一条信息:time flies like an arrow。

匿名函数

这里的timeFlies函数只被用过一次,也就是当做回调函数传给 oncePerSecond的时候,对于这种函数,在用到的时候即时构造更合理,因为可以省去定义和命名的麻烦,在 Scala 中,这样的函数称为匿名函数(anonymous functions),也就是没有名字的函数。

1
2
3
4
def main(args: Array[String]) {
oncePerSecond(() =>
println("time flies like an arrow..."))
}

代码中的右箭头‘=>’表明程序中存在一个匿名函数,箭头左边是匿名函数的参数列表,右边是函数体。在本例中,参数列表为空(箭头左边是一对空括号),而函数体和改造前定义的 timeFlies 函数保持一致。

3.类

与java不同的是,Scala中的类定义可以带参数。

1
2
3
4
class Complex(real: Double, imaginary: Double) {
def re() = real
def im() = imaginary
}

无参方法

无参方法像访问类属性(fields)一样访问类的方法,使程序更加简洁;和零参方法的差异在于:无参方法在声明和调用时,均无须在方法名后面加括号。def re = real

继承和方法重写

Scala 中重写(overriding)从父类继承的方法,必须使用 override修饰符来显式声明

override def toString() = "" +re + (if (im < 0) "" else "+") + im + "i"

4.条件类和模式匹配

条件类的引入

在 Java 中,建立树形结构最常见的做法是:创建一个表示树的抽象类,然后每
种类型的节点用一个继承自抽象类的子类来表示。而在函数式编程语言中,则可以使用
代数数据类型(algebraic data-type)来达到同样的目的。Scala 则提供了一种介于两者之间(类继承和代数数据类型),被称为条件类(case classes)的概念,下面就是用条件类定义树的示例代码:

1
2
3
abstract class Tree
case class Sum(l: Tree, r: Tree) extends Tree case class Var(n: String) extends Tree
case class Const(v: Int) extends Tree

上例中的Sum, Var和Const就是条件类,它们与普通类的差异主要体现在如下几个方面:

  • 新建条件类的实例,无须使用new关键字

  • 自动为构造函数所带的参数创建对应的 getter 方法(也就是说,如果 c是 Const的实例,通过 c.v即可访问构造函数中的同名参数v的值)

  • 条件类都默认实现equals 、hashCode和toString方法,与java的同名默认实现基本相同

  • 条件类的实例可以通过模式匹配进行分解

例子:算术式求值

既然我们已经定义了用于表示算术表达式的数据结构,接下来我们可以定义作用在这
些数据结构上的操作。首先,我们定义一个在特定环境(environment,上下文)中对表达式进行求值的函数,其中环境的作用是为了确定表达式中的变量的取值。例如:
有一个环境,对变量 x 的赋值为 5,我们记为:{x → 5},那么,在这个环境上求 x + 1的值,得到的结果为 6。

在程序中,环境也需要一种合理的方式来表示。可以使用哈希表(hash table)之类的数据结构,也可以直接使用函数(functions)!实际上,环境就是一个给变量赋予特定值的函数。上面提到的环境:{x → 5},在 Scala 中可以写成:{ case "x" => 5 }

上面这一行代码定义了一个函数,如果给该函数传入一个字符串"x"作为参数,则函数返回整数 5,否则,将抛出异常。同理,定义一个将字符串转为整数的环境可以简化代码,type Environment = String => Int

求值函数:

1
2
3
4
def eval(t: Tree, env: Environment): Int = t match {
case Sum(l, r) => eval(l, env) + eval(r, env) case Var(n) => env(n)
case Const(v) => v
}

模式匹配的过程,实际上就是把一个值和一系列的模式进行比对,如果能够匹配上,则从值中取出有用的部分进行命名,然后用这些命名的部分(作为参数)来驱动另一段代码的执行。

“模式匹配”和“类方法”除了编程风格的差异,也各有利弊,决策者需要根据程序的扩展性需求做出权衡和选择:

  • 使用类方法,添加一种节点类型比较简单,新增一种操作比较麻烦,因为要修改所有子类

  • 使用模式匹配,新增节点类型需要修改所有操作函数,而增加新的操作则只需要增加新的函数

5.Traits(特性) 和Genericity(泛型)

traits可以理解为java中的接口;Genericity即模板。

class Ref[T] {var contents: T = _}

这里赋给变量的初始值_代表各种类型的默认值。

6.RDD

RDD(Resilient Distributed Dataset弹性分布式数据集)是Spark中抽象的数据结构类型,任何数据在Spark中都被表示为RDD。从编程的角度来看,RDD可以简单看成是一个数组。和普通数组的区别是,RDD中的数据时分区存储的,这样不同分区的数据就可以分布在不同的机器上,同时可以被并行处理。因此,Spark应用程序所做的无非是把需要处理的数据转换为RDD,然后对RDD进行一系列的变换和操作,从而得到结果。

1.创建

RDD可以从普通数组创建出来,也可以从文件系统或者HDFS中的文件创建出来。

  1. 从普通数组创建RDD,里面包含了1到9这9个数字,它们分别在3个分区中

scala> val a = sc.parallelize(1 to 9, 3)

  1. 读取文件README.md来创建RDD,文件中的每一行就是RDD中的一个元素

scala> val b = sc.textFile("README.md")

2.两类操作算子

主要分两类,转换(transformation)和动作(action)。两类函数的主要区别是,transformation接受RDD并返回RDD,而action接受RDD返回非RDD.

transformation操作是延迟计算的,也就是说从一个RDD生成另一个RDD的转换操作不是马上执行,需要等到有action操作的时候才真正触发运算。

3.部分转换算子

map

对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。

举例:

scala> val a = sc.parallelize(1 to 9, 3)

scala> val b = a.map(x => x*2)

scala> a.collect

res10: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

scala> b.collect

res11: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)

flatMap

与map类似,区别是原RDD中的元素经map处理后只能生成一个元素,而原RDD中的元素经flatmap处理后可生成多个元素来构建新RDD。

举例:对原RDD中的每个元素x产生y个元素(从1到y,y为元素x的值)

scala> val a = sc.parallelize(1 to 4, 2)

scala> val b = a.flatMap(x => 1 to x)

scala> b.collect

res12: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4,1,2,3,4)

mapWith

是map的一个变种,map只需要一个输入函数,而mapWith有两个输入函数。它的定义如下:

def mapWith[A: ClassTag, U: ](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => U): RDD[U]

第一个函数constructA是把RDD的partition index(index从0开始)作为输入,输出为新类型A;

第二个函数f是把二元组(T, A)作为输入(其中T为原RDD中的元素,A为第一个函数的输出),输出类型为U。

举例:把partition index 乘以10,然后加上2作为新的RDD的元素。

val x = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3)

x.mapWith(a => a * 10)((a, b) => (b + 2)).collect

res4: Array[Int] = Array(2, 2, 2, 12, 12, 12, 22, 22, 22, 22)

union,++

将两个RDD中元素直接作为新RDD的元素

1,2,3++4,5,6:1,2,3,4,5,6

对比zip:将两个RDD元素两两组合成tuple,作为新RDD的元素

1,2,3 zip 4,5,6:(1,4),(2,5)(3,6)

4.部分action算子

collect

将数据集的所有元素作为驱动程序中的数组返回,在返回足够小的数据子集的过滤器或其他操作时很有用

foreach(func)

在数据集的每个元素上运行函数func