如何用 Python 编写 Spark UDF(用户定义函数)?
原文:https://www . geesforgeks . org/如何编写-spark-UDF-用户定义-python 中的函数/
在本文中,我们将讨论 UDF(用户定义函数)以及如何用 Python Spark 编写这些函数。UDF,基本上代表用户定义函数。UDF 将允许我们直接在 python 中的数据框和 SQL 数据库中应用这些函数,而无需单独注册它们。它还可以帮助我们为数据框创建新列,通过 UDF 向数据框列应用一个函数,因此它将扩展我们的数据框功能。它可以使用 udf()方法创建。
udf(): 这个方法会使用 lambda 函数来循环数据,它的参数会接受 lambda 函数,而 lambda 值会变成函数的参数,我们要做成一个 udf。
Pyspark 数据框示例
让我们创建一个数据框,这个数据框的主题是学生的名字,以及他/她在 100 分测试中的原始分数。
Python 3
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType,StringType
from pyspark.sql.functions import udf
spark = SparkSession.builder.appName('UDF PRACTICE').getOrCreate()
cms = ["Name","RawScore"]
data = [("Jack", "79"),
("Mira", "80"),
("Carter", "90")]
df = spark.createDataFrame(data=data,schema=cms)
df.show()
输出:
创建示例函数
现在,我们要做一个函数。因此,为了便于理解,我们将做一个简单的函数来拆分列,并检查,如果该列中的遍历对象(变得等于‘J’(大写 J)或‘C’(大写 C)或‘M’(大写 M),那么它将转换该单词的第二个字母及其大写版本。该代码的实现是:
Python 3
def Converter(str):
result = ""
a = str.split(" ")
for q in a:
if q == 'J' or 'C' or 'M':
result += q[1:2].upper()
return result
用样本函数制作 UDF
现在,我们将把它转换成我们的 UDF 函数,这反过来将减少我们的数据工作量。为此,我们在 UDF 内部使用了 lambda。
Python 3
NumberUDF = udf(lambda m: Converter(m))
在数据帧上使用 UDF
接下来我们将使用 withcolumn(),记住 withcolumn()将返回完整的数据帧。所以我们将只使用我们现有的 df 数据帧,返回值将只存储在 df 中(基本上我们将附加它)。
Python 3
df.withColumn("Special Names", NumberUDF("Name")).show()
输出:
注:我们也可以一步完成所有这些东西。
带注释的 UDF
现在,一个简单而聪明的方法是使用“ANNOTATIONS”(或装饰者)。这将在较少的步骤中创建我们的 UDF 函数。为此,我们所要做的就是在 udf 函数前面使用@符号( decorator) ,并在它的参数部分给出函数的返回类型,即把 return type 赋值为 Intergertype(),StringType(),等等。
Python 3
@udf(returnType=StringType())
def Converter(str):
result = ""
a = str.split(" ")
for q in a:
if q == 'J' or 'C' or 'M':
result += q[1:2].upper()
else:
result += q
return result
df.withColumn("Special Names", Converter("Name")) \
.show()
输出:
示例:
现在,让我们假设学校有一个评分方案,根据平方根加 3 来校准学生的分数(即他们将校准 15 分中的分数)。因此,我们将定义一个 UDF 函数,这次我们将指定返回类型。即浮点数据类型。因此,该函数的声明将是–
Python 3
def SQRT(x):
return float(math.sqrt(x)+3)
现在,我们将定义一个 udf,它的返回类型将始终是 float,也就是说,我们强制该函数以及 UDF 只给我们浮点数形式的结果。该功能的定义将是–
Python 3
UDF_marks = udf(lambda m: SQRT(m),FloatType())
udf 的第二个参数 floatingtype()将总是强制 UDF 函数只返回 floating type 中的结果。现在,我们将使用我们的 udf 函数,在我们的数据框中的 RawScore 列上标记 UDF,并将生成一个名为“ RawScore”的新列,这将是该列的默认命名。这方面的代码如下所示–
Python 3
df.select("Name","RawScore", UDF_marks("RawScore")).show()
输出:
版权属于:月萌API www.moonapi.com,转载请注明出处