Python: Spark 学习笔记

python scott 557℃ 0评论

Spark 学习笔记

Spark作为分布式系统框架的后起之秀(据说)各方面都要优于hadoop,有本地缓存,原生支持潮潮的scala, 对python的支持强大到直接可以用ipython notebook 作为交互界面。 还有几个正在开发中的分支框架分别用于机器学习和社交网络分析,简直华丽的飞起。

安装

根据阿帕奇基金会的尿性,应该是没有所谓的安装一说的,下载,解压就能使用了,当然不要忘了设置环境变量,具体不啰嗦.

使用

spark全面支持scala和java,部分支持python。好在scala也是脚本语言,所以相对比hadoop方便的多,先来看看两种交互模式。

pyspark

命令行使用 pyspark以后terminal中就会出现个大大SPARK图案和熟悉的python默认交互界面,这太丑了有啥更优秀的没?

利用 PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark呼出ipython交互界面,这个比原来的好多了。

利用

1
 PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook --pylab inline>" pyspark

调用ipython notebook

随便进去个试试示例的程序,还是用WordCount好了

1
2
3
4
file = sc.textFile("hdfs://localhost:9000/user/huangsizhe/input/README.md>")

count = file.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
count.saveAsTextFile("hdfs://localhost:9000/user/huangsizhe/wc>")

成功输出~

对了,为了避免hadoop警告建议把native lib 屏蔽了

spark-shell

这个是最常用的工具了,开发语言是scala,scala的话可以用脚本也可以编译执行,我觉得还是脚本方便

使用写的脚本方法是


1
$spark-shell -i XXX.scala

这样就会一条一条的执行了

还是WordCount

WordCount.scala放在$SPARKPATH/scala目录下

1
2
3
val file = sc.textFile("hdfs://localhost:9000/user/huangsizhe/input/README.md>")
val counts = file.flatMap(line => line.split(" >")).map(word => (word, 1)).reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://localhost:9000/user/huangsizhe/wc>")

执行


1
$spark-shell -i WordCount.scala

成功!

spark-submit

这个就有点像hadoop了,一般用于写app,可以使用python,java,scala来写程序,python不用编译, java需要用maven编译成jar文件,scala也需要用sbt编译成jar文件。

python例子:

文件结构如下


1
2
3
4
WordCountpy |
            |-input
            |-script|
                    |-WordCount.py

WordCount.py如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import sys
from operator import add
from pyspark import SparkContext
if __name__ == "__main__>":
    if len(sys.argv) != 3:
        print >> sys.stderr, "Usage: wordcount <input> <output>>"
        exit(-1)
    sc = SparkContext(appName="PythonWordCount>")
    lines = sc.textFile(sys.argv[1], 1)
    counts = lines.flatMap(lambda x: x.split(' ')) \
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(add)
    output = counts.collect()
    for (word, count) in output:
        print "%s: %i>" % (word, count)
    counts.saveAsTextFile(sys.argv[2])
    sc.stop()

cd 到WordCount项目根目录下运行: spark-submit –master local[4] script/WordCount.py input/TheMostDistantWayInTheWorld.txt output 然后就会多出一个output文件夹,里面存有结果~

scala例子

java的我就不写了。。。

scala例子文件结构如下: WordCountScala | |-input| | |-TheMostDistantWayInTheWorld.txt | |-src| | |-wordcountApp.scala | |-wordcountAPP.sbt

wordcountApp.scala如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object WordCountApp {
    def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: WordCountApp <input> <output>>")
      System.exit(1)
    }
    val conf = new SparkConf().setAppName("WordCountAPP>")
    val sc = new SparkContext(conf)
    val file = sc.textFile(args(0))
    val counts = file.flatMap(line => line.split(" >")).map(word => (word,1)).reduceByKey(_+_)
    println(counts)
    counts.saveAsTextFile(args(1))
    }
    }

wordcountAPP.sbt如下:

1
2
3
4
5
6
7
name := "wordcount>"

version := "1.0>"

scalaVersion := "2.10.4>"

libraryDependencies += "org.apache.spark>" %% "spark-core>" % "1.2.0>"

编译:

cd 到 WordCountScala项目文件夹下


1
2
$ find .
$ sbt package

编译成功你编译完的程序就在 target/scala-2.10/simple-project_2.10-1.0.jar

运行:

1
2
3
spark-submit   --class "WordCountApp>"  \
 --master local[4]   \
 target/scala-2.10/wordcount_2.10-1.0.jar input/TheMostDistantWayInTheWorld.txt output

你就能看到output了

Streaming spark的流式计算系统

这个stream很牛,实时运算,似乎是strom的替代品,试试官方例子 Networkwordcount.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == "__main__>":
    if len(sys.argv) != 3:
        print >> sys.stderr, "Usage: network_wordcount.py <hostname> <port>>"
        exit(-1)
    sc = SparkContext(appName="PythonStreamingNetworkWordCount>")
    ssc = StreamingContext(sc, 1)
    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    counts = lines.flatMap(lambda line: line.split(" >"))\
                  .map(lambda word: (word, 1))\
                  .reduceByKey(lambda a, b: a+b)
    counts.pprint()
    ssc.start()
    ssc.awaitTermination()

运行

  1. nc -lk 9999打开端口监听
  2. 新开个terminal,cd 到该脚本所在地址 spark-submit network_wordcount.py localhost 9999
  3. 在nc -lk 那个terminal下输入句子试试~ 可以在另一个terminal中看到每次回车后该行的单词次数~

原文:http://www.ituring.com.cn/article/198895

转载请注明:osetc.com » Python: Spark 学习笔记

喜欢 (0)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址