Getting Started with PySpark: Difference between revisions
No edit summary |
|||
(13 intermediate revisions by the same user not shown) | |||
Line 19: | Line 19: | ||
threshold = df.count() * .90 | threshold = df.count() * .90 | ||
null_counts = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0].asDict() | null_counts = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0].asDict() | ||
to_drop = [k for k, v in null_counts.items() if v | to_drop = [k for k, v in null_counts.items() if v >= threshold ] | ||
</pre> | </pre> | ||
Line 26: | Line 26: | ||
clean = df.drop(*to_drop) | clean = df.drop(*to_drop) | ||
display(clean)</pre> | display(clean)</pre> | ||
Create a subset of records | |||
<pre> | |||
subsetDF = cleanDF.limit(100).select("COMMENT_DESC") | |||
map = { 'zip': ['ZIP'], 'moved': ['MOVED'], 'apt': ['APT'], 'box': ['P O BOX'],'street': ['STREET','ADDRESS'] } | |||
print(subsetDF.count()) | |||
subsetDF.show()</pre> | |||
Categorize records using a User Defined Fucntion (UDF) | |||
<pre> | |||
from pyspark.sql.functions import count, col | |||
from pyspark.sql.functions import udf | |||
from pyspark.sql.types import StringType | |||
def getErrorType ( errorString ): | |||
if errorString is None: | |||
return 'empty' | |||
map = { 'moved': ['MOVED'], 'zip': ['ZIP'], 'apt': ['APT'], 'box': ['P O BOX'],'street': ['STREET','ADDRESS'] } | |||
for key,searchlist in map.items(): | |||
for searchterm in searchlist: | |||
print ("{} - {}".format(errorString, searchterm) ) | |||
if ( errorString.find(searchterm) >= 0 ): | |||
return key | |||
return 'unknown' | |||
myudf = udf(getErrorType, StringType()) | |||
subsetDF = ( cleanDF | |||
.select("COMMENT_DESC") | |||
.withColumn('ERROR_CLASSIFICATION', myudf( cleanDF['COMMENT_DESC'] ) ) | |||
) | |||
print(subsetDF.count()) | |||
subsetDF.show() | |||
country_totals = ( subsetDF | |||
.select ( "ERROR_CLASSIFICATION") | |||
.groupby("ERROR_CLASSIFICATION") | |||
.agg(count("*").alias("count")) | |||
.sort(col("count").desc()) | |||
) | |||
country_totals.show()</pre> | |||
== Better Custom UDF == | |||
<pre> | |||
from pyspark.sql.functions import count, col | |||
from pyspark.sql.functions import udf | |||
from pyspark.sql.types import StringType | |||
def getErrorType ( errorString ): | |||
if errorString is None: | |||
return 'empty' | |||
map = { 'moved': ['MOVED'], | |||
'zip': ['ZIP'], | |||
'apt': ['APT','FLAT'], | |||
'box': ['P O BOX', 'PO BOX'], | |||
'street': ['STREET','ADDRESS','HOUSE NBR', 'ADD', 'ADRESS', 'ST NUMR', 'WRG ADRS'], | |||
'phone': ['CONTACT NUMBER', 'NEED NUMBER', 'TEL', 'WRONG NUM', 'WRONG NMBR', 'WRONG #', 'CONTACT NUMBER', 'LANDLINE', 'MOBIL', 'PHONE NO', 'CELL', 'PHONE','PH','MOBILE'], | |||
'missing': ['PERSON', 'NO RESPONSE', 'REACHABLE', 'NO ANSWER'], | |||
'missort': ['SORT'] | |||
} | |||
for key,searchlist in map.items(): | |||
for searchterm in searchlist: | |||
print ("{} - {}".format(errorString, searchterm) ) | |||
if ( errorString.find(searchterm) >= 0 ): | |||
return key | |||
return 'unknown' | |||
myudf = udf(getErrorType, StringType()) | |||
subsetDF = ( cleanDF | |||
.select("COMMENT_DESC") | |||
.withColumn('ERROR_CLASSIFICATION', myudf( cleanDF['COMMENT_DESC'] ) ) | |||
) | |||
unknownDF = subsetDF.select ("*").filter("ERROR_CLASSIFICATION='unknown'") | |||
unknownDF.show(30) | |||
unknownDF.repartition(1).write.format('csv').mode('overwrite').options(header="true",sep=",").save(path="unknown.csv") | |||
error_totals = ( subsetDF | |||
.select ( "ERROR_CLASSIFICATION") | |||
.groupby("ERROR_CLASSIFICATION") | |||
.agg(count("*").alias("count")) | |||
.sort(col("count").desc()) | |||
) | |||
error_totals.show()</pre> | |||
== Cool trick to display panda data frame == | |||
<pre>from IPython.display import display, HTML | |||
display(HTML(country_totals.toPandas().to_html()))</pre> | |||
==Plotting Bar Graph== | |||
If you want the count calculated automatically (default) | |||
<pre> | |||
from plotnine import ggplot, geom_point, aes, stat_smooth, facet_wrap | |||
ggplot( country_totals.limit(10).toPandas() , aes(x='COUNTRY_CD' ) ) + geom_bar() | |||
</pre> | |||
To specify a Y value explicitly, use stat='identity' | |||
<pre> | |||
from plotnine import ggplot, geom_point, aes, stat_smooth, facet_wrap | |||
ggplot( country_totals.limit(10).toPandas() , aes(x='COUNTRY_CD',y='count' ) ) + geom_bar(stat='identity') | |||
</pre> | |||
== Creating Columns from Rows using Multiple Aggregates == | |||
<pre>scanLookupDf = ( scansDf | |||
.groupby('shp_trk_nbr') | |||
.pivot('excp_type_cd') | |||
.agg(count('excp_type_cd').alias('cnt'),F.first('evnt_crt_tmstp').alias('first'),F.last('evnt_crt_tmstp').alias('last')) | |||
)</pre> | |||
== Date Functions == | |||
=== Convert Time w/ Time Offset to Timestamp === | |||
<pre> | |||
from dateutil import parser, tz | |||
from pyspark.sql.types import StringType | |||
from pyspark.sql.functions import col, udf | |||
# Create UTC timezone | |||
utc_zone = tz.gettz('UTC') | |||
# Create UDF function that apply on the column | |||
# It takes the String, parse it to a timestamp, convert to UTC, then convert to String again | |||
func = udf(lambda x: parser.parse(x).astimezone(utc_zone).isoformat(), StringType()) | |||
myDf = ( scanDf | |||
.withColumn('event_timestamp', func(col("evnt_crt_tmstp")) ) | |||
.select("evnt_crt_tmstp", "event_timestamp") | |||
) | |||
display ( myDf.limit(5).toPandas() )</pre> | |||
== Aggregating == | |||
=== Rank Items Within a Group === | |||
<pre> | |||
from pyspark.sql.window import Window | |||
from pyspark.sql.functions import percent_rank, rank, ceil | |||
stationRankDf = ( recentHistoryDf | |||
.groupBy ( "CountryCode", "STATION" ) | |||
.agg( | |||
round(avg("PK GROSS INB"),1).alias("PK GROSS INB") | |||
) | |||
) | |||
w = Window.partitionBy(stationRankDf['CountryCode']).orderBy(stationRankDf['PK GROSS INB'].desc()) | |||
stationRankDf = ( stationRankDf | |||
.select('*', rank().over(w).alias('rank')) | |||
.filter(col('rank') <= 10) | |||
.filter ( "CountryCode == 'US'") | |||
) | |||
stationRankDf.toPandas()</pre> |
Latest revision as of 21:35, 21 April 2020
from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .config("spark.some.config.option", "some-value") \ .getOrCreate() import pyspark.sql.functions as F
Load some data
df = spark.read.load("DEX03s - 2019-10-07.csv", format="csv", sep=",", inferSchema="true", header="true")
Find columns that are more than 90% null
threshold = df.count() * .90 null_counts = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0].asDict() to_drop = [k for k, v in null_counts.items() if v >= threshold ]
Drop Null columns
clean = df.drop(*to_drop) display(clean)
Create a subset of records
subsetDF = cleanDF.limit(100).select("COMMENT_DESC") map = { 'zip': ['ZIP'], 'moved': ['MOVED'], 'apt': ['APT'], 'box': ['P O BOX'],'street': ['STREET','ADDRESS'] } print(subsetDF.count()) subsetDF.show()
Categorize records using a User Defined Fucntion (UDF)
from pyspark.sql.functions import count, col from pyspark.sql.functions import udf from pyspark.sql.types import StringType def getErrorType ( errorString ): if errorString is None: return 'empty' map = { 'moved': ['MOVED'], 'zip': ['ZIP'], 'apt': ['APT'], 'box': ['P O BOX'],'street': ['STREET','ADDRESS'] } for key,searchlist in map.items(): for searchterm in searchlist: print ("{} - {}".format(errorString, searchterm) ) if ( errorString.find(searchterm) >= 0 ): return key return 'unknown' myudf = udf(getErrorType, StringType()) subsetDF = ( cleanDF .select("COMMENT_DESC") .withColumn('ERROR_CLASSIFICATION', myudf( cleanDF['COMMENT_DESC'] ) ) ) print(subsetDF.count()) subsetDF.show() country_totals = ( subsetDF .select ( "ERROR_CLASSIFICATION") .groupby("ERROR_CLASSIFICATION") .agg(count("*").alias("count")) .sort(col("count").desc()) ) country_totals.show()
Better Custom UDF
from pyspark.sql.functions import count, col from pyspark.sql.functions import udf from pyspark.sql.types import StringType def getErrorType ( errorString ): if errorString is None: return 'empty' map = { 'moved': ['MOVED'], 'zip': ['ZIP'], 'apt': ['APT','FLAT'], 'box': ['P O BOX', 'PO BOX'], 'street': ['STREET','ADDRESS','HOUSE NBR', 'ADD', 'ADRESS', 'ST NUMR', 'WRG ADRS'], 'phone': ['CONTACT NUMBER', 'NEED NUMBER', 'TEL', 'WRONG NUM', 'WRONG NMBR', 'WRONG #', 'CONTACT NUMBER', 'LANDLINE', 'MOBIL', 'PHONE NO', 'CELL', 'PHONE','PH','MOBILE'], 'missing': ['PERSON', 'NO RESPONSE', 'REACHABLE', 'NO ANSWER'], 'missort': ['SORT'] } for key,searchlist in map.items(): for searchterm in searchlist: print ("{} - {}".format(errorString, searchterm) ) if ( errorString.find(searchterm) >= 0 ): return key return 'unknown' myudf = udf(getErrorType, StringType()) subsetDF = ( cleanDF .select("COMMENT_DESC") .withColumn('ERROR_CLASSIFICATION', myudf( cleanDF['COMMENT_DESC'] ) ) ) unknownDF = subsetDF.select ("*").filter("ERROR_CLASSIFICATION='unknown'") unknownDF.show(30) unknownDF.repartition(1).write.format('csv').mode('overwrite').options(header="true",sep=",").save(path="unknown.csv") error_totals = ( subsetDF .select ( "ERROR_CLASSIFICATION") .groupby("ERROR_CLASSIFICATION") .agg(count("*").alias("count")) .sort(col("count").desc()) ) error_totals.show()
Cool trick to display panda data frame
from IPython.display import display, HTML display(HTML(country_totals.toPandas().to_html()))
Plotting Bar Graph
If you want the count calculated automatically (default)
from plotnine import ggplot, geom_point, aes, stat_smooth, facet_wrap ggplot( country_totals.limit(10).toPandas() , aes(x='COUNTRY_CD' ) ) + geom_bar()
To specify a Y value explicitly, use stat='identity'
from plotnine import ggplot, geom_point, aes, stat_smooth, facet_wrap ggplot( country_totals.limit(10).toPandas() , aes(x='COUNTRY_CD',y='count' ) ) + geom_bar(stat='identity')
Creating Columns from Rows using Multiple Aggregates
scanLookupDf = ( scansDf .groupby('shp_trk_nbr') .pivot('excp_type_cd') .agg(count('excp_type_cd').alias('cnt'),F.first('evnt_crt_tmstp').alias('first'),F.last('evnt_crt_tmstp').alias('last')) )
Date Functions
Convert Time w/ Time Offset to Timestamp
from dateutil import parser, tz from pyspark.sql.types import StringType from pyspark.sql.functions import col, udf # Create UTC timezone utc_zone = tz.gettz('UTC') # Create UDF function that apply on the column # It takes the String, parse it to a timestamp, convert to UTC, then convert to String again func = udf(lambda x: parser.parse(x).astimezone(utc_zone).isoformat(), StringType()) myDf = ( scanDf .withColumn('event_timestamp', func(col("evnt_crt_tmstp")) ) .select("evnt_crt_tmstp", "event_timestamp") ) display ( myDf.limit(5).toPandas() )
Aggregating
Rank Items Within a Group
from pyspark.sql.window import Window from pyspark.sql.functions import percent_rank, rank, ceil stationRankDf = ( recentHistoryDf .groupBy ( "CountryCode", "STATION" ) .agg( round(avg("PK GROSS INB"),1).alias("PK GROSS INB") ) ) w = Window.partitionBy(stationRankDf['CountryCode']).orderBy(stationRankDf['PK GROSS INB'].desc()) stationRankDf = ( stationRankDf .select('*', rank().over(w).alias('rank')) .filter(col('rank') <= 10) .filter ( "CountryCode == 'US'") ) stationRankDf.toPandas()