Getting Started with PySpark: Difference between revisions

No edit summary
No edit summary
Line 31: Line 31:
subsetDF = cleanDF.limit(100).select("COMMENT_DESC")
subsetDF = cleanDF.limit(100).select("COMMENT_DESC")
map = { 'zip': ['ZIP'], 'moved': ['MOVED'], 'apt': ['APT'], 'box': ['P O BOX'],'street': ['STREET','ADDRESS']  }
map = { 'zip': ['ZIP'], 'moved': ['MOVED'], 'apt': ['APT'], 'box': ['P O BOX'],'street': ['STREET','ADDRESS']  }
print(subsetDF.count())
subsetDF.show()</pre>
This isn't working, but the intent is to map a new column based on a custom UDF function. I think it's maybe failing because functions are not allowed inside a UDF.
<pre>
from pyspark.sql.functions import count, col
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def getErrorType ( errorString ):
    return errorString.upper()
    map = { 'zip': ['ZIP'], 'moved': ['MOVED'], 'apt': ['APT'], 'box': ['P O BOX'],'street': ['STREET','ADDRESS']  }   
    for key,searchlist in map.items():
        for searchterm in searchlist:
            if ( errorString.find(searchterm) ):
                return key
   
    return 'unknown'       
def dummy_function(data_str):
    cleaned_str = 'dummyData'
    return cleaned_str
myudf = udf ( getErrorType, StringType() )
subsetDF = ( cleanDF
                .limit(10)
                .select("COMMENT_DESC")
                .withColumn('ERROR_CLASSIFICATION', myudf( cleanDF['COMMENT_DESC'] ) )
          )


print(subsetDF.count())
print(subsetDF.count())
subsetDF.show()</pre>
subsetDF.show()</pre>

Revision as of 22:58, 30 October 2019

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

import pyspark.sql.functions as F

Load some data

df = spark.read.load("DEX03s - 2019-10-07.csv",
                     format="csv", sep=",", inferSchema="true", header="true")

Find columns that are more than 90% null

threshold = df.count() * .90
null_counts = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0].asDict()
to_drop = [k for k, v in null_counts.items() if v >= threshold ]

Drop Null columns

clean = df.drop(*to_drop)
display(clean)

Create a subset of records

subsetDF = cleanDF.limit(100).select("COMMENT_DESC")
map = { 'zip': ['ZIP'], 'moved': ['MOVED'], 'apt': ['APT'], 'box': ['P O BOX'],'street': ['STREET','ADDRESS']  }

print(subsetDF.count())
subsetDF.show()

This isn't working, but the intent is to map a new column based on a custom UDF function. I think it's maybe failing because functions are not allowed inside a UDF.

from pyspark.sql.functions import count, col
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType


def getErrorType ( errorString ):
    return errorString.upper()
    map = { 'zip': ['ZIP'], 'moved': ['MOVED'], 'apt': ['APT'], 'box': ['P O BOX'],'street': ['STREET','ADDRESS']  }    
    for key,searchlist in map.items():
        for searchterm in searchlist:
            if ( errorString.find(searchterm) ):
                return key
    
    return 'unknown'        

def dummy_function(data_str):
     cleaned_str = 'dummyData'
     return cleaned_str

myudf = udf ( getErrorType, StringType() ) 

subsetDF = ( cleanDF
                .limit(10)
                .select("COMMENT_DESC")
                .withColumn('ERROR_CLASSIFICATION', myudf( cleanDF['COMMENT_DESC'] ) ) 
           )

print(subsetDF.count())
subsetDF.show()