博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
大数据spark计算引擎快速入门
阅读量:6620 次
发布时间:2019-06-25

本文共 5329 字,大约阅读时间需要 17 分钟。

hot3.png

 

spark快速入门

  spark框架是用scala写的,运行在Java虚拟机(JVM)上。支持Python、Java、Scala或R多种语言编写客户端应用。

下载Spark

  访问http://spark.apache.org/downloads.html选择预编译的版本进行下载。

解压Spark

  打开终端,将工作路径转到下载的spark压缩包所在的目录,然后解压压缩包。 可使用如下命令: cd ~ tar -xf spark-2.2.2-bin-hadoop2.7.tgz -C /opt/module/ cd spark-2.2.2-bin-hadoop2.7 ls   注:tar命令中x标记指定tar命令执行解压缩操作,f标记指定压缩包的文件名。

spark主要目录结构

  • README.md

  包含用来入门spark的简单使用说明 - bin

  包含可用来和spark进行各种方式交互的一系列可执行文件 - core、streaming、python

  包含spark项目主要组件的源代码 - examples

  包含一些可查看和运行的spark程序,对学习spark的API非常有帮助

运行案例及交互式Shell

运行案例

./bin/run-example SparkPi 10

scala shell

``` ./bin/spark-shell --master local[2]

# --master选项指定运行模式。local是指使用一个线程本地运行;local[N]是指使用N个线程本地运行。 ```

python shell

./bin/pyspark --master local[2]

R shell

./bin/sparkR --master local[2]

提交应用脚本

```

支持多种语言提交

./bin/spark-submit examples/src/main/python/pi.py 10 ./bin/spark-submit examples/src/main/r/dataframe.R ... ```

使用spark shell进行交互式分析

scala

  使用spark-shell脚本进行交互式分析。

基础

``` scala> val textFile = spark.read.textFile("README.md") textFile: org.apache.spark.sql.Dataset[String] = [value: string]

scala> textFile.count() // Number of items in this Dataset res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs

scala> textFile.first() // First item in this Dataset res1: String = # Apache Spark

使用filter算子返回原DataSet的子集

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark")) linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]

拉链方式

scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"? res3: Long = 15 ```

进阶

```

使用DataSet的转换和动作查找最多单词的行

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b) res4: Long = 15 ```

```

统计单词个数

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count() wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]

scala> wordCounts.collect() res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...) ```

python

  使用pyspark脚本进行交互式分析

基础

```

textFile = spark.read.text("README.md")

textFile.count() # Number of rows in this DataFrame 126

textFile.first() # First row in this DataFrame Row(value=u'# Apache Spark')

filter过滤

linesWithSpark = textFile.filter(textFile.value.contains("Spark"))

拉链方式

textFile.filter(textFile.value.contains("Spark")).count() # How many lines contain "Spark"? 15 ```

进阶

```

查找最多单词的行

from pyspark.sql.functions import *

textFile.select(size(split(textFile.value, "\s+")).name("numWords")).agg(max(col("numWords"))).collect() [Row(max(numWords)=15)]

统计单词个数

wordCounts = textFile.select(explode(split(textFile.value, "\s+")).alias("word")).groupBy("word").count()

wordCounts.collect() [Row(word=u'online', count=1), Row(word=u'graphs', count=1), ...] ```

独立应用

  spark除了交互式运行之外,spark也可以在Java、Scala或Python的独立程序中被连接使用。   独立应用与shell的主要区别在于需要自行初始化SparkContext。

scala

分别统计包含单词a和单词b的行数

``` /* SimpleApp.scala */ import org.apache.spark.sql.SparkSession

object SimpleApp { def main(args: Array[String]) { val logFile = "YOURSPARKHOME/README.md" // Should be some file on your system val spark = SparkSession.builder.appName("Simple Application").getOrCreate() val logData = spark.read.textFile(logFile).cache() val numAs = logData.filter(line => line.contains("a")).count() val numBs = logData.filter(line => line.contains("b")).count() println(s"Lines with a: $numAs, Lines with b: $numBs") spark.stop() } } ``` 运行应用

```

Use spark-submit to run your application

$ YOURSPARKHOME/bin/spark-submit \ --class "SimpleApp" \ --master local[4] \ target/scala-2.11/simple-project_2.11-1.0.jar ... Lines with a: 46, Lines with b: 23 ```

java

分别统计包含单词a和单词b的行数

``` /* SimpleApp.java */ import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.Dataset;

public class SimpleApp { public static void main(String[] args) { String logFile = "YOURSPARKHOME/README.md"; // Should be some file on your system SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate(); Dataset logData = spark.read().textFile(logFile).cache();

long numAs = logData.filter(s -> s.contains("a")).count();long numBs = logData.filter(s -> s.contains("b")).count();System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);spark.stop();

} } ``` 运行应用

```

Use spark-submit to run your application

$ YOURSPARKHOME/bin/spark-submit \ --class "SimpleApp" \ --master local[4] \ target/simple-project-1.0.jar ... Lines with a: 46, Lines with b: 23 ```

python

分别统计包含单词a和单词b的行数

setup.py脚本添加内容 install_requires=[ 'pyspark=={site.SPARK_VERSION}' ]

``` """SimpleApp.py""" from pyspark.sql import SparkSession

logFile = "YOURSPARKHOME/README.md" # Should be some file on your system spark = SparkSession.builder().appName(appName).master(master).getOrCreate() logData = spark.read.text(logFile).cache()

numAs = logData.filter(logData.value.contains('a')).count() numBs = logData.filter(logData.value.contains('b')).count()

print("Lines with a: %i, lines with b: %i" % (numAs, numBs))

spark.stop() ``` 运行应用

```

Use spark-submit to run your application

$ YOURSPARKHOME/bin/spark-submit \ --master local[4] \ SimpleApp.py ... Lines with a: 46, Lines with b: 23 ```

 

转载于:https://my.oschina.net/u/3980693/blog/2878787

你可能感兴趣的文章
学习MSCOREE.dll是托管程序的入口点
查看>>
bbc--平台点击进入详情页配置
查看>>
ORACLE存储过程 练习系列六 关键字 分页查询某个方案下的建表语句
查看>>
JavaScript设计模式 代理模式
查看>>
Uiautomator 2.0之UiDevice新增API学习小记
查看>>
在MS Test中如何测试private方法
查看>>
.net4.0中json时间转换问题
查看>>
反射+特性打造简洁的AJAX调用
查看>>
挤牛奶
查看>>
给年轻程序员的几句话
查看>>
重新评估团队贡献分
查看>>
python 和 Ajax相结合的一些资源
查看>>
解决git clone over https 401 error
查看>>
Java 使用单例模式的注意事项
查看>>
VB IobjectSafety接口
查看>>
浅析c#内存泄漏
查看>>
nginx 反向代理TCP mysql
查看>>
[转]【视觉 SLAM-2】 视觉SLAM- ORB 源码详解 2
查看>>
[leetcode-455-Assign Cookies]
查看>>
【转】protobuf2.5.0在<delete [] elements_;>crash的问题。
查看>>