9  Aggregated calculations

Aggregated calculations are vital for feature engineering. Every data scientist, data engineer, or ML engineer must have this logic accessible for use. For example, we may want to know the number of employees in each business unit, the sales by month, or the average traffic per hour at a given location.

Sometimes, we are creating a new dimension table based on the grouping variable. Other times, we will want to keep our observational unit of the original table but add additional columns with the summary variable appended. In SQL, we will differentiate the two calculations by the GROUP BY method and the PARTITION BY method.

Here are example calculations using Pyspark and SQL.

We have our default DataFrame for the below examples.

import pandas as pd
# create pandas dataframe
pdf = pd.DataFrame({'Section':[1,2,2,3,3,3], 'Student':['a','b','c', 'd', 'e','f'], 'Score':[90, 85, 75, 95, 65, 98]})
# convert to spark dataframe assumping your spark instance is created.
df = spark.createDataFrame(pdf)
Section Student Score
1 a 90
2 b 85
2 c 75
3 d 95
3 e 65
3 f 98

Using the above df we can create a temporary view in Spark;

df.createOrReplaceTempView("df")

9.1 GROUP BY

When using ‘GROUP BY’ functions or methods in the varied languages of data science, the resulting table’s observational unit (row) is defined by the levels of the variable used in the ‘GROUP BY’ argument. We move from many rows to fewer rows, as shown in the two leftmost tables of the above image.

9.1.1 Language-specific help files

The GROUP BY methods of each language are combined with their respective calculation process.

9.1.2 Examples

The following two examples result in an average and standard deviation for each section.

Section average sd
1 90 nan
2 80 7.07107
3 86 18.2483

9.1.2.1 Pyspark

df.groupBy('Section').agg(
  F.mean('Score').alias("average"),
  F.stddev_samp('Score').alias("sd")
)

9.1.2.2 SQL

SELECT
  Section,
  MEAN(Score),
  STDDEV_SAMP(Score)
FROM df
GROUP BY Section

9.2 Window

At its core, a window function calculates a return value for every input row of a table based on a group of rows, called the Frame. Every input row can have a unique frame associated with it. This characteristic of window functions makes them more powerful than other functions and allows users to express various data processing tasks that are hard (if not impossible) to be expressed without window functions in a concise way. ref

9.2.1 Language-specific help files

9.2.2 Examples

Here are example calculations using Pyspark and SQL. Using the example table above, we want to create the following table.

And we want the following table.

Section Student Score rank min
1 a 90 1 90
2 b 85 1 75
2 c 75 2 75
3 d 95 2 65
3 e 65 3 65
3 f 98 1 65

9.2.2.1 Pyspark

from pyspark.sql import Window
import pyspark.sql.functions as F

window_order = Window.partitionBy('Section').orderBy(F.col('Score').desc())
window = Window.partitionBy('Section')

df.withColumn("rank", F.rank().over(window_order)) \
  .withColumn("min", F.min('Score').over(window)) \
  .sort('Student') \
  .show()

9.2.2.2 SQL

Then, we can use the following SQL command.

SELECT Section, Student, Score, 
  RANK(Score) OVER (PARTITION BY Section ORDER BY Score) as rank,
  MIN(Score) OVER (PARTITION BY SECTION) as min
FROM df

9.2.3 Describing Window calculations

I especially like TMichel’s response that has the highest vote. Although the second response by KARTHICK seems to be the best answer to this specific question. Here is how TMichel explains the Window method in Pyspark (with minor edits by me).

Here is a dissection of the details of a Window example in Pyspark.

Using collect_list() and groupBy() will result in an unordered list of values. Depending on how your data is partitioned, Spark will append values to your list as soon as it finds a row in the group. The order then depends on how Spark plans your aggregation over the executors.

Assume we have the following data.

id date value
1 2014-01-03 10
1 2014-01-04 5
1 2014-01-05 15
1 2014-01-06 20
2 2014-02-10 100
2 2014-03-11 500
2 2014-04-15 1500

A Window function allows you to control that situation, grouping rows by a specific value so you can operate over each of the resultant groups:

w = Window.partitionBy('id').orderBy('date')
  • partitionBy() - you want groups/partitions of rows with the same id
  • orderBy() - you want each row in the group to be sorted by date

The defined Window scope (rows with the same id, sorted by date) then frames the collect_list operation.

F.collect_list('value').over(w)

At this point, the dataframe has a created a new column sorted_list with an ordered list of values, sorted by date.

id date value sorted_list
1 2014-01-03 10 [10, 5, 15, 20]
1 2014-01-04 5 [10, 5, 15, 20]
1 2014-01-05 15 [10, 5, 15, 20]
1 2014-01-06 20 [10, 5, 15, 20]
2 2014-02-10 100 [100, 500, 1500]
2 2014-03-11 500 [100, 500, 1500]
2 2014-04-15 1500 [100, 500, 1500]

9.3 Example Data

from pyspark.sql import functions as F
date_data = [
    (1, "2014-01-03", 10 ), \
    (1, "2014-01-04", 5), \
    (1, "2014-01-05", 15), \
    (1, "2014-01-06", 20), \
    (2, "2014-02-10", 100), \
    (2, "2014-03-11", 500), \
    (2, "2014-04-15", 1500)  
]
date_columns = ["id", "date", "value"]
df_date = spark.createDataFrame(data = date_data, schema = date_columns)\
        .withColumn("date", F.to_date("date"))

student_data = [
    (1, "a", 90), \
    (2, "b", 85), \
    (2, "c", 75), \
    (3, "d", 95), \
    (3, "e", 65), \
    (3, "f", 98) \

]

student_columns = ["Section", "Student", "Score"]
df = spark.createDataFrame(data = student_data, schema = student_columns)

9.4 References