Getting Started with PySpark
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
import pyspark.sql.functions as F
Load some data
df = spark.read.load("DEX03s - 2019-10-07.csv",
format="csv", sep=",", inferSchema="true", header="true")
Find columns that are more than 90% null
threshold = df.count() * .90 null_counts = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0].asDict() to_drop = [k for k, v in null_counts.items() if v >= threshold ]
Drop Null columns
clean = df.drop(*to_drop) display(clean)
Create a subset of records
subsetDF = cleanDF.limit(100).select("COMMENT_DESC")
map = { 'zip': ['ZIP'], 'moved': ['MOVED'], 'apt': ['APT'], 'box': ['P O BOX'],'street': ['STREET','ADDRESS'] }
print(subsetDF.count())
subsetDF.show()
Categorize records using a User Defined Fucntion (UDF)
from pyspark.sql.functions import count, col
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def getErrorType ( errorString ):
if errorString is None:
return 'empty'
map = { 'moved': ['MOVED'], 'zip': ['ZIP'], 'apt': ['APT'], 'box': ['P O BOX'],'street': ['STREET','ADDRESS'] }
for key,searchlist in map.items():
for searchterm in searchlist:
print ("{} - {}".format(errorString, searchterm) )
if ( errorString.find(searchterm) >= 0 ):
return key
return 'unknown'
myudf = udf(getErrorType, StringType())
subsetDF = ( cleanDF
.select("COMMENT_DESC")
.withColumn('ERROR_CLASSIFICATION', myudf( cleanDF['COMMENT_DESC'] ) )
)
print(subsetDF.count())
subsetDF.show()
country_totals = ( subsetDF
.select ( "ERROR_CLASSIFICATION")
.groupby("ERROR_CLASSIFICATION")
.agg(count("*").alias("count"))
.sort(col("count").desc())
)
country_totals.show()
Better Custom UDF
from pyspark.sql.functions import count, col
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def getErrorType ( errorString ):
if errorString is None:
return 'empty'
map = { 'moved': ['MOVED'],
'zip': ['ZIP'],
'apt': ['APT','FLAT'],
'box': ['P O BOX', 'PO BOX'],
'street': ['STREET','ADDRESS','HOUSE NBR', 'ADD', 'ADRESS', 'ST NUMR', 'WRG ADRS'],
'phone': ['CONTACT NUMBER', 'NEED NUMBER', 'TEL', 'WRONG NUM', 'WRONG NMBR', 'WRONG #', 'CONTACT NUMBER', 'LANDLINE', 'MOBIL', 'PHONE NO', 'CELL', 'PHONE','PH','MOBILE'],
'missing': ['PERSON', 'NO RESPONSE', 'REACHABLE', 'NO ANSWER'],
'missort': ['SORT']
}
for key,searchlist in map.items():
for searchterm in searchlist:
print ("{} - {}".format(errorString, searchterm) )
if ( errorString.find(searchterm) >= 0 ):
return key
return 'unknown'
myudf = udf(getErrorType, StringType())
subsetDF = ( cleanDF
.select("COMMENT_DESC")
.withColumn('ERROR_CLASSIFICATION', myudf( cleanDF['COMMENT_DESC'] ) )
)
unknownDF = subsetDF.select ("*").filter("ERROR_CLASSIFICATION='unknown'")
unknownDF.show(30)
unknownDF.repartition(1).write.format('csv').mode('overwrite').options(header="true",sep=",").save(path="unknown.csv")
error_totals = ( subsetDF
.select ( "ERROR_CLASSIFICATION")
.groupby("ERROR_CLASSIFICATION")
.agg(count("*").alias("count"))
.sort(col("count").desc())
)
error_totals.show()
Cool trick to display panda data frame
from IPython.display import display, HTML display(HTML(country_totals.toPandas().to_html()))
Plotting Bar Graph
If you want the count calculated automatically (default)
from plotnine import ggplot, geom_point, aes, stat_smooth, facet_wrap ggplot( country_totals.limit(10).toPandas() , aes(x='COUNTRY_CD' ) ) + geom_bar()
To specify a Y value explicitly, use stat='identity'
from plotnine import ggplot, geom_point, aes, stat_smooth, facet_wrap ggplot( country_totals.limit(10).toPandas() , aes(x='COUNTRY_CD',y='count' ) ) + geom_bar(stat='identity')
Creating Columns from Rows using Multiple Aggregates
scanLookupDf = ( scansDf
.groupby('shp_trk_nbr')
.pivot('excp_type_cd')
.agg(count('excp_type_cd').alias('cnt'),F.first('evnt_crt_tmstp').alias('first'),F.last('evnt_crt_tmstp').alias('last'))
)
Date Functions
Convert Time w/ Time Offset to Timestamp
from dateutil import parser, tz
from pyspark.sql.types import StringType
from pyspark.sql.functions import col, udf
# Create UTC timezone
utc_zone = tz.gettz('UTC')
# Create UDF function that apply on the column
# It takes the String, parse it to a timestamp, convert to UTC, then convert to String again
func = udf(lambda x: parser.parse(x).astimezone(utc_zone).isoformat(), StringType())
myDf = ( scanDf
.withColumn('event_timestamp', func(col("evnt_crt_tmstp")) )
.select("evnt_crt_tmstp", "event_timestamp")
)
display ( myDf.limit(5).toPandas() )
Aggregating
Rank Items Within a Group
from pyspark.sql.window import Window
from pyspark.sql.functions import percent_rank, rank, ceil
stationRankDf = ( recentHistoryDf
.groupBy ( "CountryCode", "STATION" )
.agg(
round(avg("PK GROSS INB"),1).alias("PK GROSS INB")
)
)
w = Window.partitionBy(stationRankDf['CountryCode']).orderBy(stationRankDf['PK GROSS INB'].desc())
stationRankDf = ( stationRankDf
.select('*', rank().over(w).alias('rank'))
.filter(col('rank') <= 10)
.filter ( "CountryCode == 'US'")
)
stationRankDf.toPandas()