Getting Started with PySpark: Difference between revisions
No edit summary |
No edit summary |
||
Line 31: | Line 31: | ||
subsetDF = cleanDF.limit(100).select("COMMENT_DESC") | subsetDF = cleanDF.limit(100).select("COMMENT_DESC") | ||
map = { 'zip': ['ZIP'], 'moved': ['MOVED'], 'apt': ['APT'], 'box': ['P O BOX'],'street': ['STREET','ADDRESS'] } | map = { 'zip': ['ZIP'], 'moved': ['MOVED'], 'apt': ['APT'], 'box': ['P O BOX'],'street': ['STREET','ADDRESS'] } | ||
print(subsetDF.count()) | |||
subsetDF.show()</pre> | |||
This isn't working, but the intent is to map a new column based on a custom UDF function. I think it's maybe failing because functions are not allowed inside a UDF. | |||
<pre> | |||
from pyspark.sql.functions import count, col | |||
from pyspark.sql.functions import udf | |||
from pyspark.sql.types import StringType | |||
def getErrorType ( errorString ): | |||
return errorString.upper() | |||
map = { 'zip': ['ZIP'], 'moved': ['MOVED'], 'apt': ['APT'], 'box': ['P O BOX'],'street': ['STREET','ADDRESS'] } | |||
for key,searchlist in map.items(): | |||
for searchterm in searchlist: | |||
if ( errorString.find(searchterm) ): | |||
return key | |||
return 'unknown' | |||
def dummy_function(data_str): | |||
cleaned_str = 'dummyData' | |||
return cleaned_str | |||
myudf = udf ( getErrorType, StringType() ) | |||
subsetDF = ( cleanDF | |||
.limit(10) | |||
.select("COMMENT_DESC") | |||
.withColumn('ERROR_CLASSIFICATION', myudf( cleanDF['COMMENT_DESC'] ) ) | |||
) | |||
print(subsetDF.count()) | print(subsetDF.count()) | ||
subsetDF.show()</pre> | subsetDF.show()</pre> |
Revision as of 22:58, 30 October 2019
from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .config("spark.some.config.option", "some-value") \ .getOrCreate() import pyspark.sql.functions as F
Load some data
df = spark.read.load("DEX03s - 2019-10-07.csv", format="csv", sep=",", inferSchema="true", header="true")
Find columns that are more than 90% null
threshold = df.count() * .90 null_counts = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0].asDict() to_drop = [k for k, v in null_counts.items() if v >= threshold ]
Drop Null columns
clean = df.drop(*to_drop) display(clean)
Create a subset of records
subsetDF = cleanDF.limit(100).select("COMMENT_DESC") map = { 'zip': ['ZIP'], 'moved': ['MOVED'], 'apt': ['APT'], 'box': ['P O BOX'],'street': ['STREET','ADDRESS'] } print(subsetDF.count()) subsetDF.show()
This isn't working, but the intent is to map a new column based on a custom UDF function. I think it's maybe failing because functions are not allowed inside a UDF.
from pyspark.sql.functions import count, col from pyspark.sql.functions import udf from pyspark.sql.types import StringType def getErrorType ( errorString ): return errorString.upper() map = { 'zip': ['ZIP'], 'moved': ['MOVED'], 'apt': ['APT'], 'box': ['P O BOX'],'street': ['STREET','ADDRESS'] } for key,searchlist in map.items(): for searchterm in searchlist: if ( errorString.find(searchterm) ): return key return 'unknown' def dummy_function(data_str): cleaned_str = 'dummyData' return cleaned_str myudf = udf ( getErrorType, StringType() ) subsetDF = ( cleanDF .limit(10) .select("COMMENT_DESC") .withColumn('ERROR_CLASSIFICATION', myudf( cleanDF['COMMENT_DESC'] ) ) ) print(subsetDF.count()) subsetDF.show()