Getting Started with PySpark: Difference between revisions

No edit summary
No edit summary
Line 74: Line 74:
)
)
country_totals.show()</pre>
country_totals.show()</pre>
== Better Custom UDF ==
<pre>
from pyspark.sql.functions import count, col
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def getErrorType ( errorString ):
    if errorString is None:
        return 'empty'
   
    map = { 'moved': ['MOVED'],
            'zip': ['ZIP'],
            'apt': ['APT','FLAT'],
            'box': ['P O BOX', 'PO BOX'],
            'street': ['STREET','ADDRESS','HOUSE NBR', 'ADD', 'ADRESS', 'ST NUMR', 'WRG ADRS'],
            'phone': ['CONTACT NUMBER', 'NEED NUMBER', 'TEL', 'WRONG NUM', 'WRONG NMBR',  'WRONG #', 'CONTACT NUMBER', 'LANDLINE', 'MOBIL', 'PHONE NO', 'CELL', 'PHONE','PH','MOBILE'],
            'missing': ['PERSON', 'NO RESPONSE', 'REACHABLE', 'NO ANSWER'],
            'missort': ['SORT']
            }   
    for key,searchlist in map.items():
        for searchterm in searchlist:
            print ("{} - {}".format(errorString, searchterm) )
            if ( errorString.find(searchterm) >= 0 ):
                return key   
    return 'unknown'
myudf = udf(getErrorType, StringType())
   
subsetDF = ( cleanDF               
                .select("COMMENT_DESC")
                .withColumn('ERROR_CLASSIFICATION', myudf( cleanDF['COMMENT_DESC'] ) )
          )
unknownDF = subsetDF.select ("*").filter("ERROR_CLASSIFICATION='unknown'")
unknownDF.show(30)
unknownDF.repartition(1).write.format('csv').mode('overwrite').options(header="true",sep=",").save(path="unknown.csv")
error_totals = ( subsetDF
                  .select ( "ERROR_CLASSIFICATION")
                  .groupby("ERROR_CLASSIFICATION")
                  .agg(count("*").alias("count"))
                  .sort(col("count").desc())
)
error_totals.show()</pre>


== Cool trick to display panda data frame ==
== Cool trick to display panda data frame ==

Revision as of 17:53, 1 November 2019

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

import pyspark.sql.functions as F

Load some data

df = spark.read.load("DEX03s - 2019-10-07.csv",
                     format="csv", sep=",", inferSchema="true", header="true")

Find columns that are more than 90% null

threshold = df.count() * .90
null_counts = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0].asDict()
to_drop = [k for k, v in null_counts.items() if v >= threshold ]

Drop Null columns

clean = df.drop(*to_drop)
display(clean)

Create a subset of records

subsetDF = cleanDF.limit(100).select("COMMENT_DESC")
map = { 'zip': ['ZIP'], 'moved': ['MOVED'], 'apt': ['APT'], 'box': ['P O BOX'],'street': ['STREET','ADDRESS']  }

print(subsetDF.count())
subsetDF.show()

Categorize records using a User Defined Fucntion (UDF)

from pyspark.sql.functions import count, col
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType


def getErrorType ( errorString ):
    if errorString is None:
        return 'empty'
    
    map = { 'moved': ['MOVED'], 'zip': ['ZIP'], 'apt': ['APT'], 'box': ['P O BOX'],'street': ['STREET','ADDRESS']  }    
    for key,searchlist in map.items():
        for searchterm in searchlist:
            print ("{} - {}".format(errorString, searchterm) )
            if ( errorString.find(searchterm) >= 0 ):
                return key    
    return 'unknown'

myudf = udf(getErrorType, StringType())
    
subsetDF = ( cleanDF                
                .select("COMMENT_DESC")
                .withColumn('ERROR_CLASSIFICATION', myudf( cleanDF['COMMENT_DESC'] ) ) 
           )



print(subsetDF.count())
subsetDF.show()

country_totals = ( subsetDF
                  .select ( "ERROR_CLASSIFICATION")
                  .groupby("ERROR_CLASSIFICATION")
                  .agg(count("*").alias("count"))
                  .sort(col("count").desc())
)
country_totals.show()

Better Custom UDF

from pyspark.sql.functions import count, col
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType


def getErrorType ( errorString ):
    if errorString is None:
        return 'empty'
    
    map = { 'moved': ['MOVED'], 
             'zip': ['ZIP'], 
             'apt': ['APT','FLAT'], 
             'box': ['P O BOX', 'PO BOX'],
             'street': ['STREET','ADDRESS','HOUSE NBR', 'ADD', 'ADRESS', 'ST NUMR', 'WRG ADRS'],
             'phone': ['CONTACT NUMBER', 'NEED NUMBER', 'TEL', 'WRONG NUM', 'WRONG NMBR',  'WRONG #', 'CONTACT NUMBER', 'LANDLINE', 'MOBIL', 'PHONE NO', 'CELL', 'PHONE','PH','MOBILE'],
             'missing': ['PERSON', 'NO RESPONSE', 'REACHABLE', 'NO ANSWER'],
             'missort': ['SORT']
             }    
    for key,searchlist in map.items():
        for searchterm in searchlist:
            print ("{} - {}".format(errorString, searchterm) )
            if ( errorString.find(searchterm) >= 0 ):
                return key    
    return 'unknown'

myudf = udf(getErrorType, StringType())
    
subsetDF = ( cleanDF                
                .select("COMMENT_DESC")
                .withColumn('ERROR_CLASSIFICATION', myudf( cleanDF['COMMENT_DESC'] ) ) 
           )


unknownDF = subsetDF.select ("*").filter("ERROR_CLASSIFICATION='unknown'")
unknownDF.show(30)
unknownDF.repartition(1).write.format('csv').mode('overwrite').options(header="true",sep=",").save(path="unknown.csv")


error_totals = ( subsetDF
                  .select ( "ERROR_CLASSIFICATION")
                  .groupby("ERROR_CLASSIFICATION")
                  .agg(count("*").alias("count"))
                  .sort(col("count").desc())
)
error_totals.show()



Cool trick to display panda data frame

from IPython.display import display, HTML

display(HTML(country_totals.toPandas().to_html()))

Plotting Bar Graph

If you want the count calculated automatically (default)

from plotnine import ggplot, geom_point, aes, stat_smooth, facet_wrap

ggplot( country_totals.limit(10).toPandas() , aes(x='COUNTRY_CD' )  ) + geom_bar()

To specify a Y value explicitly, use stat='identity'

from plotnine import ggplot, geom_point, aes, stat_smooth, facet_wrap

ggplot( country_totals.limit(10).toPandas() , aes(x='COUNTRY_CD',y='count' )  ) + geom_bar(stat='identity')