博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
pyspark 内容介绍(一)
阅读量:6657 次
发布时间:2019-06-25

本文共 9971 字,大约阅读时间需要 33 分钟。

 

pyspark 包介绍

子包

内容

PySpark是针对Spark的Python API。根据网上提供的资料,现在汇总一下这些类的基本用法,并举例说明如何具体使用。也是总结一下经常用到的这些公有类的使用方式。方便初学者查询及使用。

Public 类们:

  • :

    Spark 功能的主入口。

  • :

    弹性分布式数据集,就是在Spark中的基础抽象

  • :

    一个在task之间重用的广播变量。

  • :

    一个“add-only” 共享变量,task只能增加值。

  • :

    用于配置Spark.

  • :

    在job中访问文件。

  • :

    更细粒度的缓存持久化级别。

     

将分为两篇介绍这些类的内容,这里首先介绍SparkConf
1. class
pyspark.
SparkConf
(
loadDefaults=True,
_jvm=None,
_jconf=None
)

配置一个Spark应用,一般用来设置各种Spark的键值对作为参数。

大多数时候,使用来创建SparkConf对象,也用于载入来自spark.* Java系统的属性值。此时,在对象上设置的任何参数都有高于系统属性的优先级。

对于单元测试,也能调用SparkConf(false)来略过额外的配置,无论系统属性是什么都可以获得相同的配置。

这个类中的设值方法都是支持链式结构的,例如,你可以这样编写配置conf.setMaster(“local”).setAppName(“My app”)

注意:

一旦SparkConf对象被传递给Spark,它就被复制并且不能被其他人修改。

contains
(
key
)

配置中是否包含一个指定键。

get
(
key,
defaultValue=None
)

获取配置的某些键值,或者返回默认值。

getAll
(
)

得到所有的键值对的list。

set
(
key,
value
)

设置配置属性。

setAll
(
pairs
)

通过传递一个键值对的list,为多个参数赋值。

etAppName
(
value
)

设置应用名称

setExecutorEnv
(
key=None,
value=None,
pairs=None
)

设置环境变量复制给执行器。

setIfMissing
(
key,
value
)

如果没有,则设置一个配置属性。

setMaster
(
value
)

设置主连接地址。

setSparkHome
(
value
)

设置工作节点上的Spark安装路径。

toDebugString
(
)

返回一个可打印的配置版本。

2. class
pyspark.
SparkContext
(
master=None,
appName=None,
sparkHome=None,
pyFiles=None,
environment=None,
batchSize=0,
serializer=PickleSerializer(),
conf=None,
gateway=None,
jsc=None,
profiler_cls=<class 'pyspark.profiler.BasicProfiler'>
)

Spark功能的主入口,SparkContext 代表到Spark 集群的连接,并且在集群上能创建RDD和broadcast。

PACKAGE_EXTENSIONS
= ('.zip', '.egg', '.jar')
accumulator
(
value,
accum_param=None
)

用指定的初始化值创建一个累加器。使用AccumulatorParam对象定义如何添加数据类型的值。默认AccumulatorParams为整型和浮点型。如果其他类型需要自定义。

addFile
(
path,
recursive=False
)

使用在每个节点上的Spark job添加文件下载。这里path 参数可以使本地文件也可以使在HDFS中的文件,也可以是HTTP、HTTPS或者URI。

在Spark的job中访问文件,使用L{SparkFiles.get(fileName)<pyspark.files.SparkFiles.get>}可以找到下载位置。

如果递归选项被设置为“TRUE”则路径能被指定。当前路径仅仅支持Hadoop文件系统。

1 >>> from pyspark import SparkFiles 2 >>> path = os.path.join(tempdir, "test.txt") 3 >>> with open(path, "w") as testFile: 4 ...    _ = testFile.write("100")  5 >>> sc.addFile(path) 6 >>> def func(iterator): 7 ...    with open(SparkFiles.get("test.txt")) as testFile: 8 ...        fileVal = int(testFile.readline()) 9 ...        return [x * fileVal for x in iterator]10 >>> sc.parallelize([1, 2, 3, 4]).mapPartitions(func).collect()11 [100, 200, 300, 400]

 

addPyFile
(
path
)

为所有将在SparkContext上执行的任务添加一个a.py或者.zip的附件。这里path 参数可以使本地文件也可以使在HDFS中的文件,也可以是HTTP、HTTPS或者FTP URI。

applicationId

Spark应用的唯一ID,它的格式取决于调度器实现。

  • 本地模式下像这样的ID‘local-1433865536131’
  • 模式下像这样的ID‘application_1433865536131_34483’
>>> sc.applicationId  u'local-...'

 

binaryFiles
(
path,
minPartitions=None
)

注意

  • 从HDFS上读取二进制文件的路径,本地文件系统(在所有节点上都可用),或者其他hadoop支持的文件系统URI党组偶一个二进制数组。每个文件作为单独的记录,并且返回一个键值对,这个键就是每个文件的了路径,值就是每个文件的内容。
  • 小文件优先选择,大文件也可以,但是会引起性能问题。
binaryRecords
(
path,
recordLength
)
  • path – 输入文件路径
  • recordLength – 分割记录的长度(位数)
注意

从平面二进制文件中载入数据,假设每个记录都是一套指定数字格式的数字(ByteBuffer),并且每个记录位数的数是恒定的。

broadcast
(
value
)

广播一个制度变量到集群,返回一个L{Broadcast<pyspark.broadcast.Broadcast>} 对象在分布式函数中读取。这个变量将只发一次给每个集群。

cancelAllJobs
(
)

取消所有已排程的或者正在运行的job。

cancelJobGroup
(
groupId
)

 

取消指定组的已激活job,查看更多信息。

 

defaultMinPartitions

当不被用户指定时,默认Hadoop RDDs 为最小分区。

defaultParallelism

当不被用户指定时,默认并行级别执行。(例如reduce task)

dump_profiles
(
path
)

转存配置信息到目录路径下。

emptyRDD
(
)

创建没有分区或者元素的RDD。

getConf
(
)
getLocalProperty
(
key
)

在当前线程中得到一个本地设置属性。

classmethod
getOrCreate
(
conf=None
)
参数:conf – SparkConf (optional)

获取或者实例化一个SparkContext并且注册为单例模式对象。

hadoopFile
(
path,
inputFormatClass,
keyClass,
valueClass,
keyConverter=None,
valueConverter=None,
conf=None,
batchSize=0
)、

用任意来自HDFS的键和值类读取一个老的Hadoop输入格式,本地系统(所有节点可用),或者任何支持Hadoop的文件系统的URI。这个机制是与sc.sequenceFile是一样的。

Hadoop 配置可以作为Python的字典传递。这将被转化成Java中的配置。

参数:

  • path – Hadoop文件路径
  • inputFormatClass – 输入的Hadoop文件的规范格式(例如 “org.apache.hadoop.mapred.TextInputFormat”)
  • keyClass – 可写键类的合格类名 (例如“org.apache.hadoop.io.Text”)
  • valueClass –可写值类的合格类名 (e.g. “org.apache.hadoop.io.LongWritable”)
  • keyConverter – (默认为none)
  • valueConverter – (默认为none)
  • conf – Hadoop配置,作为一个字典传值 (默认为none)
  • batchSize – Python对象的数量代表一个单一的JAVA对象 (默认 0, 表示自动匹配batchSize)
hadoopRDD
(
inputFormatClass,
keyClass,
valueClass,
keyConverter=None,
valueConverter=None,
conf=None,
batchSize=0
)

读取Hadoop输入格式用任意键值类。与上面的类相似。

参数:

  • inputFormatClass – 输入的Hadoop文件的规范格式(例如 “org.apache.hadoop.mapred.TextInputFormat”)
  • keyClass – 可写键类的合格类名 (例如“org.apache.hadoop.io.Text”)
  • valueClass –可写值类的合格类名 (e.g. “org.apache.hadoop.io.LongWritable”)
  • keyConverter – (默认为none)
  • valueConverter – (默认为none)
  • conf – Hadoop配置,作为一个字典传值 (默认为none)
  • batchSize – Python对象的数量代表一个单一的JAVA对象 (默认 0, 表示自动匹配batchSize)
newAPIHadoopFile
(
path,
inputFormatClass,
keyClass,
valueClass,
keyConverter=None,
valueConverter=None,
conf=None,
batchSize=0
)

与上面的功能类似.

newAPIHadoopRDD
(
inputFormatClass,
keyClass,
valueClass,
keyConverter=None,
valueConverter=None,
conf=None,
batchSize=0
)

任意Hadoop的配置作为参数传递。

parallelize
(
c,
numSlices=None
)

分配一个本Python集合构成一个RDD。如果输入代表了一个性能范围,建议使用xrange。

>>> sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()[[0], [2], [3], [4], [6]]>>> sc.parallelize(xrange(0, 6, 2), 5).glom().collect()[[], [0], [], [2], [4]]

 

pickleFile
(
name,
minPartitions=None
)

载入使用方法保存的RDD。

>>> tmpFile = NamedTemporaryFile(delete=True)>>> tmpFile.close()>>> sc.parallelize(range(10)).saveAsPickleFile(tmpFile.name, 5)>>> sorted(sc.pickleFile(tmpFile.name, 3).collect())[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

 

range
(
start,
end=None,
step=1,
numSlices=None
)

创建一个int类型元素组成的RDD,从开始值到结束(不包含结束),里面都是按照步长增长的元素。这就要用到Python内置的函数range()。如果只有一个参数调用,这个参数就表示结束值,开始值默认为0.

参数:

  • start –起始值
  • end – 结束值(不包含)
  • step – 步长(默认: 1)
  • numSlices –RDD分区数量(切片数)

返回值:RDD

>>> sc.range(5).collect()[0, 1, 2, 3, 4]>>> sc.range(2, 4).collect()[2, 3]>>> sc.range(1, 7, 2).collect()[1, 3, 5]

 

runJob
(
rdd,
partitionFunc,
partitions=None,
allowLocal=False
)

执行指定的partitionFunc 在指定的分区,返回一个元素数组。如果不指定分区,则将运行在所有分区上。

>>> myRDD = sc.parallelize(range(6), 3)>>> sc.runJob(myRDD, lambda part: [x * x for x in part])[0, 1, 4, 9, 16, 25]>>> myRDD = sc.parallelize(range(6), 3)>>> sc.runJob(myRDD, lambda part: [x * x for x in part], [0, 2], True)[0, 1, 16, 25]

 


 
  • sequenceFile(path, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, minSplits=None, batchSize=0)
  • 读取Hadoop 的SequenceFile,机制如下:

    1.一个Java RDD通过SequenceFile或者其他输入格式创建,需要键值的可写类参数。

    2.序列化

    3.如果失败,则对每个键值调用‘toString’。

    4.在Python上,用来反序列化。

参数:

path –序列化文件路径

keyClass – 可用键类(例如 “org.apache.hadoop.io.Text”)

valueClass – 可用值类 (例如 “org.apache.hadoop.io.LongWritable”)

keyConverter

valueConverter

minSplits – 数据集最低分割数(默认 min(2, sc.defaultParallelism))

batchSize – 代表一个JAVA对象Python对象的数量 (默认0, 自动)

 

setCheckpointDir(dirName)

 

设定作为检查点的RDD的目录,如果运行在集群上,则目录一定时HDFS路径。

setJobGroup
(
groupId,
description,
interruptOnCancel=False
)

分配一个组ID给所有被这个线程开启的job。

通常,一个执行单位由多个Spark 的action或者job组成。应用程序可以将所有把所有job组成一个组,给一个组的描述。一旦设置好,Spark的web UI 将关联job和组。

应用使用来取消组。

>>> import threading>>> from time import sleep>>> result = "Not Set">>> lock = threading.Lock()>>> def map_func(x):...     sleep(100)...     raise Exception("Task should have been cancelled")>>> def start_job(x):...     global result...     try:...         sc.setJobGroup("job_to_cancel", "some description")...         result = sc.parallelize(range(x)).map(map_func).collect()...     except Exception as e:...         result = "Cancelled"...     lock.release()>>> def stop_job():...     sleep(5)...     sc.cancelJobGroup("job_to_cancel")>>> supress = lock.acquire()>>> supress = threading.Thread(target=start_job, args=(10,)).start()>>> supress = threading.Thread(target=stop_job).start()>>> supress = lock.acquire()>>> print(result)Cancelled

 

如果对于job组,interruptOnCancel被设定为True,那么那么取消job将在执行线程中调用Thread.interrupt()。这对于确保任务实时停止是有作用的。但是默认情况下,HDFS可以通过标记节点为dead状态来停止线程。

setLocalProperty
(
key,
value
)

设定本地影响提交工作的属性,例如Spark 公平调度池。

setLogLevel
(
logLevel
)

控制日志级别。重写任何用户自定义的日志设定。有效的日志级别包括:ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN。

classmethod
setSystemProperty
(
key,
value
)

设定Java系统属性,例如spark.executor.memory,这一定要在实例化SparkContext之前被激活。

show_profiles
(
)

打印配置信息到标准输出。

sparkUser
(
)

为运行SparkContext 的用户获得SPARK_USER

startTime

当SparkContext被发起,则返回新的时间纪元。

statusTracker
(
)

Return object

返回对象

stop
(
)

关闭SparkContext。

textFile
(
name,
minPartitions=None,
use_unicode=True
)

从HDFS中读取一个text文件,本地文件系统(所有节点可用),或者任何支持Hadoop的文件系统的URI,然后返回一个字符串类型的RDD。

如果用户use_unicode为False,则strings类型将为str(用utf-8编码),这是一种比unicode更快、更小的编码(Spark1.2以后加入)。

>>> path = os.path.join(tempdir, "sample-text.txt")>>> with open(path, "w") as testFile:...    _ = testFile.write("Hello world!")>>> textFile = sc.textFile(path)>>> textFile.collect()[u'Hello world!']

 

uiWebUrl

返回由SparkContext的SparkUI实例化开启的URL。

union
(
rdds
)

建立RDD列表的联合。

支持不同序列化格式的RDD的unions()方法,需要使用默认的串行器将它们强制序列化(串行化):

>>> path = os.path.join(tempdir, "union-text.txt")>>> with open(path, "w") as testFile:...    _ = testFile.write("Hello")>>> textFile = sc.textFile(path)>>> textFile.collect()[u'Hello']>>> parallelized = sc.parallelize(["World!"])>>> sorted(sc.union([textFile, parallelized]).collect())[u'Hello', 'World!']

 

version

应用运行的Spark的版本。

wholeTextFiles
(
path,
minPartitions=None,
use_unicode=True
)

读取HDFS的文本文件的路径,这是一个本地文件系统(所有节点可用),或者任何支持Hadoop的文件系统的URI。每个文件被当做一个独立记录来读取,然后返回一个键值对,键为每个文件的路径,值为每个文件的内容。

如果用户use_unicode为False,则strings类型将为str(用utf-8编码),这是一种比unicode更快、更小的编码(Spark1.2以后加入)。

举例说明,如果有如下文件:

hdfs://a-hdfs-path/part-00000hdfs://a-hdfs-path/part-00001...hdfs://a-hdfs-path/part-nnnnn

如果执行 rdd = sparkContext.wholeTextFiles(“hdfs://a-hdfs-path”), 那么rdd 包含:

(a-hdfs-path/part-00000, its content)(a-hdfs-path/part-00001, its content)...(a-hdfs-path/part-nnnnn, its content)

注意

这种情况适合小文件,因为每个文件都会被载入到内存中。消耗很多内存啊!

>>> dirPath = os.path.join(tempdir, "files")>>> os.mkdir(dirPath)>>> with open(os.path.join(dirPath, "1.txt"), "w") as file1:...    _ = file1.write("1")>>> with open(os.path.join(dirPath, "2.txt"), "w") as file2:...    _ = file2.write("2")>>> textFiles = sc.wholeTextFiles(dirPath)>>> sorted(textFiles.collect())[(u'.../1.txt', u'1'), (u'.../2.txt', u'2')]

 

本篇接少了两个类SparkContextSparkConf,下一篇将会介绍其余的几个类的内容,这是一篇汇总性质的文章主要便于以后使用时知道具体类中的方法调用为刚刚接触Spark和我差不多人提供参考。还有理解不到位的请多多理解。

转载地址:http://tpqto.baihongyu.com/

你可能感兴趣的文章
使用PyQt来编写第一个Python GUI程序
查看>>
C/C++ 全局变量和局部变量的大小限制
查看>>
PMD代码检查工具
查看>>
(国内)完美下载Android源码Ubuntu版
查看>>
第一章(3)——执行计划重用
查看>>
脑机接口公司Neuralink融资2700万美元,马斯克否认
查看>>
rbd块映射
查看>>
阿里巴巴云栖大会发布“云栖简史” 七年推动中国互联网大爆发
查看>>
duilib 给List表头增加百分比控制宽度的功能
查看>>
背水一战 Windows 10 (14) - 动画: 线性动画, 关键帧动画
查看>>
半全局立体匹配方法调研,以及一些立体匹配方向的思考
查看>>
FU-A分包方式,以及从RTP包里面得到H.264数据和AAC数据的方法
查看>>
快递更快了!双11传统物流企业与阿里云的“云端智能”
查看>>
数据说话,锅别都让程序员背
查看>>
5G让万物互联成为可能 大连接时代谋划物联网
查看>>
【数据结构6】图
查看>>
[WCF权限控制]基于Windows用户组的授权方式[下篇]
查看>>
java-Atomic包
查看>>
查找数组中第二大的数值
查看>>
Spring进行TestNG测试中无法插入、删除数据库数据(access)的解决
查看>>