Pyspark

Pyspark – cheatsheet with comparison to SQL5

I think I should not convince you to learn PySpark. There is multiple reasons why we should do that, first of all PySpark is one of the most widely used big data processing frameworks, providing support for large-scale data processing using the Apache Spark framework that currently dominated Big Data world. How to learn it? The simplest way will be to do it by analogy to something that we already know. Today I would like to share with you different kind of article – In the past I have developed a comprehensive cheatsheet to assist myself with the mastery of PySpark. In my journey to become proficient in Spark, I initially leveraged my familiarity with SQL to facilitate my understanding of data frames in PySpark. This approach proved to be an effective method for gaining a solid understanding of the PySpark library.

To start let’s say what PySpark is. PySpark is the Python API for Apache Spark, an open-source, distributed computing system used for big data processing and analysis. It allows developers to process large amounts of data in a parallel, fast, and efficient manner using Python. Please assume that in some cases it cannot be translated directly, Spark has a concept of a table but in reality this is just metastore pointers to the storage.

To properly check below pyspark structures you can execute on your cluster following queries:

CREATE TABLE employees
(
employeeId INT,
employeeName  STRING,
employeeSurname STRING,
employeeTitle STRING,
age INT,
city STRING,
birthdate DATE,
salary DECIMAL
)

CREATE OR REPLACE TABLE sales
(
EmployeeId INT,
Quantity DECIMAL,
UnitPrice DECIMAL,
City STRING,
Date DATE
)

INSERT INTO employees (employeeId,employeeName, employeeSurname, employeeTitle, age, city, birthdate, salary)
VALUES 
(1,"John", "Doe", "Manager", 35, "New York", "1986-05-10", 75000),
(2,"Jane", "Doe", "Developer", 32, "London", "1989-02-15", 65000),
(3,"Jim", "Smith", "Architect", 40, "Paris", "1980-08-20", 85000),
(4,"Sarah", "Johnson", "Designer", 29, "Berlin", "1992-12-01", 55000),
(5,"Michael", "Brown", "Product Manager", 38, "Tokyo", "1984-06-06", 75000),
(6,"Emily", "Davis", "Data Analyst", 31, "Sydney", "1990-09-12", 65000),
(7,"David", "Wilson", "Salesperson", 33, "Toronto", "1988-07-01", 55000),
(8,"William", "Johnson", "Support Engineer", 36, "Beijing", "1985-04-01", 65000),
(9,"Brian", "Anderson", "Marketing Manager", 37, "Shanghai", "1983-05-15", 75000),
(10,"James", "Lee", "Operations Manager", 39, "Seoul", "1981-03-01", 85000),
(11,"Emily", "Parker", "HR Manager", 30, "Dubai", "1991-12-25", 75000),
(12,"Jacob", "Williams", "Accountant", 34, "New Delhi", "1987-06-01", 65000);

INSERT INTO sales (EmployeeId, Quantity, UnitPrice, City, Date) VALUES (1, 10, 20, 'New York',"2023-03-01");
INSERT INTO sales (EmployeeId, Quantity, UnitPrice, City, Date) VALUES (1, 5, 10, 'London',"2023-03-01");
INSERT INTO sales (EmployeeId, Quantity, UnitPrice, City, Date) VALUES (3, 15, 15, 'Paris',"2023-03-02");
INSERT INTO sales (EmployeeId, Quantity, UnitPrice, City, Date) VALUES (4, 20, 30, 'Berlin',"2023-03-02");
INSERT INTO sales (EmployeeId, Quantity, UnitPrice, City, Date) VALUES (4, 25, 10, 'Tokyo',"2023-03-03");
INSERT INTO sales (EmployeeId, Quantity, UnitPrice, City, Date) VALUES (4, 30, 20, 'Sydney',"2023-03-03");
INSERT INTO sales (EmployeeId, Quantity, UnitPrice, City, Date) VALUES (5, 35, 25, 'Beijing',"2023-03-03");
INSERT INTO sales (EmployeeId, Quantity, UnitPrice, City, Date) VALUES (6, 40, 30, 'Shanghai',"2023-03-04");
INSERT INTO sales (EmployeeId, Quantity, UnitPrice, City, Date) VALUES (7, 45, 35, 'New Delhi',"2023-03-05");
INSERT INTO sales (EmployeeId, Quantity, UnitPrice, City, Date) VALUES (7, 50, 40, 'New Delhi',"2023-03-05");
INSERT INTO sales (EmployeeId, Quantity, UnitPrice, City, Date) VALUES (9, 55, 45, 'Seoul',"2023-03-05");
INSERT INTO sales (EmployeeId, Quantity, UnitPrice, City, Date) VALUES (10, 60, 50, 'Rio de Janeiro',"2023-03-05");
INSERT INTO sales (EmployeeId, Quantity, UnitPrice, City, Date) VALUES (10, 65, 55, 'Paris',"2023-03-06");
INSERT INTO sales (EmployeeId, Quantity, UnitPrice, City, Date) VALUES (NULL, 70, 60, 'Mexico City',"2023-03-06");
INSERT INTO sales (EmployeeId, Quantity, UnitPrice, City, Date) VALUES (NULL, 75, 65, 'Mumbai',"2023-03-08");

Feel free to adjust it to your needs.

SELECT all columns from table

The statement is used to retrieve all columns and rows of data from the table.

SQL:

SELECT * FROM employees

Python:

df = spark.table("employees")
df.show()

Select specific columns from table

This query retrieves the values of the columns “employeeName,” “employeeSurname,” and “employeeTitle” from the “employees” table.

SQL:

SELECT employeeName,employeeSurname,employeeTitle FROM employees

Pyspark:

df = spark.table("employees")
df = df.select("employeeName", "employeeSurname", "employeeTitle")
df.show()

Order result

SELECT employeeName,employeeSurname,employeeTitle FROM employees ORDER BY employeeSurname

SQL:

SELECT employeeName,employeeSurname,employeeTitle FROM employees ORDER BY employeeSurname

Pyspark:

df = spark.table("employees")
df = df.select("employeeName", "employeeSurname", "employeeTitle")
df = df.orderBy("employeeSurname")
df.show()

Order result descending

This is a SQL query that retrieves the values of the columns “employeeName”, “employeeSurname”, and “employeeTitle” from a table called “employees”. The results will be ordered in ascending order based on the values in the “employeeSurname” column.

SQL:

SELECT employeeName,employeeSurname,employeeTitle FROM employees ORDER BY employeeSurname DESC

Pyspark:

df = spark.table("employees")
df = df.select("employeeName", "employeeSurname", "employeeTitle")
df = df.orderBy(df["employeeSurname"].desc())
df.show()

Select TOP N rows

The query retrieves the “employeeName”, “employeeSurname”, and “employeeTitle” columns from the “employees” table. It limits the number of returned rows to 10, with the first 10 rows being determined by the sorting order specified in the “ORDER BY” clause, which sorts the results by the “employeeSurname” column. This means that the query will return the first 10 employees from the table sorted by their surname.

SQL:

SELECT TOP 10 employeeName,employeeSurname,employeeTitle FROM employees ORDER BY employeeSurname

Pyspark:

df = spark.table("employees")
df = df.select("employeeName", "employeeSurname", "employeeTitle")
df = df.orderBy("employeeSurname")
df = df.limit(10)
df.show()

Adding aliases

This query retrieves data from the “employees” table and selects the columns “employeeName”, “employeeSurname”, and “employeeTitle”. The “AS” clause is used to assign new names, or aliases, to each of the selected columns.

SQL:

SELECT employeeName AS Name ,employeeSurname AS Surname,employeeTitle AS Title FROM employees

Pyspark:

df = spark.table("employees")
df = df.withColumnRenamed("employeeName", "Name")
df = df.withColumnRenamed("employeeSurname", "Surname")
df = df.withColumnRenamed("employeeTitle", "Title")
df.show()

Filtering – greater than

his query retrieves data from the “employees” table and selects all columns by using the wildcard selector “*”. The “WHERE” clause is used to filter the results based on a condition. In this case, the condition is “Age > 35”, which means that only rows where the value in the “Age” column is greater than 35 will be returned.

SQL:

SELECT * FROM employees
WHERE Age>35

Pyspark:

df = spark.table("employees")
df = df.filter(df["Age"] > 35)
df.show()

Filtering – logical AND

The “WHERE” clause is used to filter the results based on multiple conditions. In this case, the conditions are “Age > 18” and “City = ‘Warsaw'”. These conditions are combined using the logical operator “AND”, meaning that only rows where both conditions are true will be returned.

SQL:

SELECT * FROM employees
WHERE Age>35 AND City = 'Seoul'

Pyspark:

df = spark.table("employees")
df = df.filter((df["Age"] > 35) & (df["City"] == "Seoul"))
df.show()

Filtering – logical OR

The “WHERE” clause is used to filter the results based on multiple conditions. In this case, the conditions are “Age > 18” and “City = ‘Warsaw'”. These conditions are combined using the logical operator “OR”, meaning that rows where either one or both conditions are true will be returned.

SQL:

SELECT * FROM employees
WHERE Age>35 OR City = 'Seoul'

Pyspark:

df = spark.table("employees")
df = df.filter((df["Age"] > 35) | (df["City"] == "Seoul"))
df.show()

Filtering – wildcard

The “WHERE” clause is used to filter the results based on a condition using a pattern match. In this case, the condition is “City LIKE ‘W%'”, which means that only rows where the value in the “City” column starts with the letter “W” will be returned. The “%” symbol is used as a wildcard character, representing zero or more characters.

SQL:

SELECT * FROM employees
WHERE City LIKE 'S%'

Pyspark:

df = spark.table("employees") 
df = df.filter(df["City"].like("S%")) 
df.show()

Filtering – BETWEEN

The “WHERE” clause is used to filter the results based on a range of values. In this case, the range is defined using the “BETWEEN” operator, with the values ‘19900101’ and ‘20000101’ as the lower and upper bounds, respectively. Only rows where the value in the “BirthDate” column is within this range will be returned.

SQL:

SELECT * FROM employees
WHERE BirthDate BETWEEN '19900101' AND '20000101'

Pyspark:

df = spark.table("employees")
df = df.filter(df["BirthDate"].between("1990-01-01", "2000-01-01")) 
df.show()

Filtering – not equal

The “WHERE” clause is used to filter the results based on a condition. In this case, the condition is “City <> ‘Warsaw'”, which means that only rows where the value in the “City” column is not equal to ‘Warsaw’ will be returned

SQL:

SELECT * FROM employees
WHERE City <> 'Warsaw'

Pyspark:

df = spark.read.table("employees")
df.filter(df["City"] != 'Toronto').select("*").show()

Filtering – IN

The “WHERE” clause is used to filter the results based on a condition. In this case, the condition is “City IN (‘Warsaw’,’Krakow’,’Lublin’)”, which means that only rows where the value in the “City” column is equal to one of the cities listed in the parentheses will be returned. The “IN” operator is used to specify a set of values to check for equality.

SQL:

SELECT * FROM employees
WHERE City IN ('Warsaw','Krakow','Lublin')

Pyspark:

df = spark.read.table("employees") 
df = df.filter(df["City"].isin(['Tokyo', 'London', 'New York']))
df.show()

Filtering – NOT IN

The “WHERE” clause is used to filter the results based on a condition. In this case, the condition is “City NOT IN (‘Warsaw’,’Krakow’,’Lublin’)”, which means that only rows where the value in the “City” column is not equal to any of the cities listed in the parentheses will be returned. The “NOT IN” operator is used to specify a set of values to check for inequality.

SQL:

SELECT * FROM employees
WHERE City NOT IN ('Warsaw','Krakow','Lublin')

Pyspark:

df = spark.read.table("employees") 
df = df.filter(~df["City"].isin(['Tokyo', 'London', 'New York']))
df.show()

Aggregate – COUNT

This query retrieves data from the “employees” table and selects the “City” column and a count of the number of rows for each city.

SQL:

SELECT City,COUNT(*) FROM employees
GROUP BY City

Pyspark:

from pyspark.sql.functions import count
df = spark.read.table("employees") 
df = df.groupBy("City").agg(count("*").alias("Count")).select("City", "Count")
df.show()

Aggregate – MIN,MAX,AVG,SUM

This query retrieves data from the “employees” table and selects the “City” column and various aggregate functions applied to the “Salary” column for each city.

SQL:

SELECT City,AVG(Salary),MIN(Salary),MAX(Salary),SUM(Salary) FROM employees
GROUP BY City

Pyspark:

from pyspark.sql.functions import avg, min, max, sum 
df = spark.read.table("employees") 
df = df.groupBy("City").agg(avg("Salary").alias("avg_salary"), min("Salary").alias("min_salary"), max("Salary").alias("max_salary"), sum("Salary").alias("total_salary"))
df.show()

Aggregate – HAVING

This query retrieves data from the “employees” table and selects the “City” column and the average salary of employees in each city. The “GROUP BY” clause is used to group the rows in the “employees” table by the “City” column. The “AVG(Salary)” function is used to calculate the average salary of employees in each city. The “HAVING” clause is used to further filter the groups based on the aggregate result. In this case, the condition is “Salary>70000”, which means that only groups where the average salary of employees is greater than 70000 will be returned.

SQL:

SELECT City,AVG(Salary)AS Salary FROM employees
GROUP BY City
HAVING Salary>1000

Pyspark:

from pyspark.sql.functions import avg

df = spark.read.table("employees") 
df = df.groupBy("City") .agg(avg(col("Salary").cast("double")).alias("Salary")).filter(col("Salary") > 70000) .select("City", "Salary")
df.show()

SELECT DISTINCT

The “SELECT DISTINCT” statement is used to return only unique values from the “City” column. This means that if there are multiple employees with the same city, the query will only return one instance of that city.

SQL:

SELECT DISTINCT City FROM employees

Pyspark:

df = spark.read.table("employees") 
df = df.select("City").distinct()
df.show()

Calculated column

This query performs a simple calculation to add a new column to a dataframe. The calculation multiplies the values of two existing columns in the dataframe (named “OrderQty” and “UnitPrice”) to create a new column named “SalesAmount”.

SQL

SELECT Quantity * UnitPrice AS SalesAmount From Sales

Pyspark:

df = spark.read.table("sales") 
df = df.withColumn("SalesAmount", df["Quantity"] * df["UnitPrice"])
df.show()e"))

to replace existing column use third parameter overwrite=True

SUBSTRING, LEFT, RIGHT

This query retrieves data from the “employees” table and selects three columns, each using a different string function: “LEFT”, “RIGHT”, and “SUBSTRING”.”LEFT” function returns the first 2 characters of the city name. “RIGHT” function returns the last 2 characters of the city name. “SUBSTRING” function returns the 2 characters of the city name from third character.

SQL:

SELECT LEFT(City,2),RIGHT(City,2),SUBSTRING(City,1,2) FROM Employees

Pyspark:

from pyspark.sql.functions import substring

df = spark.read.table("sales") 
df = df.select( df["City"],substring(df["City"], 1, 2).alias("left"), substring(df["City"], -2, 2).alias("right"), substring(df["City"], 3, 2).alias("substring") )
df.show()

Concatenation

The “CONCAT” function concatenates (joins) two or more strings into a single string. In this case, the first argument to the “CONCAT” function is the string “Employee: “, and the second argument is the concatenation of the “FirstName” and “LastName” columns, separated by a space.

SQL:

SELECT CONCAT('Employee: ',employeeName+ ' '+employeeSurname) FROM Employees

Pyspark

from pyspark.sql.functions import lit,concat 
df = spark.read.table("employees") 
df = df.select(concat(lit("Employee: "), df["employeeName"], lit(" "), df["employeeSurname"]).alias("full_name"))
df.show()

Filtering – NOT IN and subquery

This query retrieves data from the “employees” table and selects all rows where the city is not present in the “City” column of the “Sales” table.

SQL

SELECT * FROM Employees
WHERE City NOT IN
(SELECT City FROM Sales)

Pyspark

df1 = spark.read.table("employees") 
df2 = spark.read.table("sales") 
df = df1.join(df2, "City", "left_anti")
df.show()

Subquery with filtering

This query retrieves data from the “employees” table, creates a subquery that groups the employees by city and counts the number of employees in each city, and then filters the result to only show cities with fewer than 2 employees.

SQL

SELECT City
(
SELECT City,COUNT(*) AS Cnt FROM employees
GROUP BY City
) AS t
WHERE Cnt<2

Pyspark

from pyspark.sql.functions import count 
df = spark.read.table("employees") 
df = df.select("City").join(df.groupBy("City").agg(count("*").alias("cnt")), "City").where("cnt < 2")
df.show()

JOIN – Inner join

This query performs a join between the “employees” and “Sales” tables, which combines data from both tables based on the “employeeId” column.

SQL

SELECT e.City, s.(Quantity) AS Qty FROM employees AS e
JOIN Sales AS s
ON s.employeeId = e.employeeId

Python

df1 = spark.read.table("employees")
df2 = spark.read.table("sales") 
df= df1.join(df2,df1["employeeId"] == df2["employeeId"]).select(df1["City"], df2["Quantity"].alias("Qty"))
df.show()

JOIN – Left join

This query performs a left join between the “employees” and “Sales” tables, which combines data from both tables based on the “employeeId” column. The left join returns all rows from the “employees” table and only matching rows from the “Sales” table. If there is no matching data in the “Sales” table for a particular employee, NULL values will be displayed for the “Quantity” column.

SQL

SELECT e.City, s.(Quantity) AS Qty FROM employees AS e
LEFT JOIN Sales AS s
ON s.employeeId = e.employeeId

Pyspark

df1 = spark.read.table("employees")
df2 = spark.read.table("sales") 
df= df1.join(df2,df1["employeeId"] == df2["employeeId"],"left").select(df1["City"], df2["Quantity"].alias("Qty"))
df.show()

JOIN – right join

This query performs a right join between the “employees” and “Sales” tables, which combines data from both tables based on the “employeeId” column. The right join returns all rows from the “Sales” table and only matching rows from the “employees” table. If there is no matching data in the “employees” table for a particular sale, NULL values will be displayed for the “City” column.

SQL

SELECT e.City, s.Quantity AS Qty FROM employees AS e
RIGHT JOIN Sales AS s
ON s.employeeId = e.employeeId

Pyspark

df1 = spark.read.table("employees")
df2 = spark.read.table("sales") 
df= df1.join(df2,df1["employeeId"] == df2["employeeId"],"right").select(df1["City"], df2["Quantity"].alias("Qty"))
df.show()

JOIN – full join

The query performs a full join between the “employees” table and the “Sales” table, using the “employeeId” column as the join condition. The result will return all rows from both tables, including matching and non-matching rows. For non-matching rows, the values from the other table will be filled with NULL.

SQL

SELECT e.City, s.Quantity AS Qty FROM employees AS e
FULL JOIN Sales AS s
ON s.employeeId = e.employeeId

Pyspark

df1 = spark.read.table("employees")
df2 = spark.read.table("sales") 
df= df1.join(df2,df1["employeeId"] == df2["employeeId"],"full").select(df1["City"], df2["Quantity"].alias("Qty"))
df.show()

JOIN – Cross join

This query that performs a cross join between two tables, and returns all columns from both tables. The cross join results in all possible combinations of rows from both tables.

SQL

SELECT * FROM employees AS e
CROSS JOIN Sales AS s

Pyspark

df1 = spark.table("employees")
df2 = spark.table("sales")
df1.crossJoin(df2)

Working on sets – UNION

This is a query that retrieves the employeeName and employeeSurname columns from the employees table, where the value of the City column is either ‘Toronto’ or ‘New Delhi’. The UNION operation is used to combine two datasets into a single dataset. The result will contain only unique rows, with duplicates removed.

SQL

SELECT employeeName, EmployeeSurname FROM employees
WHERE City='Toronto'
UNION
SELECT employeeName, employeeSurname FROM employees
WHERE City='New Delhi'
UNION 
SELECT employeeName, employeeSurname FROM employees 
WHERE City='New Delhi'

Python

from pyspark.sql.functions import col

df = spark.table("employees") 
df1 = df.select(col("employeeName"), col("employeeSurname")).filter(col("City") == "Toronto") 
df2 = df.select(col("employeeName"), col("employeeSurname")).filter(col("City") == "New Delhi")
df3 = df.select(col("employeeName"), col("employeeSurname")).filter(col("City") == "New Delhi") 
df = df1.union(df2).union(df3).distinct()
df.show()

Working on sets – UNION ALL

This is a query retrieves the employeeName and employeeSurname columns from the employees table, where the value of the City column is either ‘Toronto’ or ‘New Delhi’. The UNION ALL keyword is used to combine the results of the three separate datasets into a single dataset. Unlike UNION, UNION ALL does not remove duplicates, so the result will contain all rows from all three datasets.

SQL

SELECT employeeName, employeeSurname FROM employees
WHERE City='Toronto'
UNION ALL
SELECT employeeName, employeeSurname FROM employees
WHERE City='New Delhi'
UNION ALL 
SELECT employeeName, employeeSurname 
FROM employees 
WHERE City='New Delhi'

Pyspark

from pyspark.sql.functions import col

df = spark.table("employees") 
df1 = df.select(col("employeeName"), col("employeeSurname")).filter(col("City") == "Toronto") 
df2 = df.select(col("employeeName"), col("employeeSurname")).filter(col("City") == "New Delhi")
df3 = df.select(col("employeeName"), col("employeeSurname")).filter(col("City") == "New Delhi") 
df = df1.union(df2).union(df3)
df.show()

Working on sets – INTERSECT

This is a query retrieves the employeeName and employeeSurname columns from the employees where the value of the City column is either ‘Toronto’ or ‘New Delhi’. The INTERSECT keyword is used to return only the rows that are common to both datasets. In other words, the result will contain only the rows where the City column has the value ‘Toronto’ in both datasets.

SQL

SELECT employeeName, employeeSurname FROM employees
WHERE City IN ('New Delhi, Toronto')
INTERSECT
SELECT employeeName, employeeSurname FROM employees
WHERE City='Toronto'

Pyspark

df = spark.table("employees") 
df1 = df.select(col("employeeName"), col("employeeSurname")).filter(df["City"].isin(["Toronto","New Delhi"]))
df2 = df.select(col("employeeName"), col("employeeSurname")).filter(df["City"].isin(["Toronto"]))
df = df1.intersect(df2)
df.show()

Working on sets – EXCEPT

This is a query that retrieves the employeeName and employeeSurname columns from the employees table, where the value of the City column is either ‘Toronto’ or ‘New Delhi’.The EXCEPT operation is used to return the rows that are in the first dataset but not in the second . In this case, the first dataset retrieves the rows where the City column is either ‘Toronto’ or ‘New Delhi’, while the second SELECT statement retrieves the rows where the City column has the value ‘Toronto’.

SQL

SELECT employeeName, employeeSurname FROM employees
WHERE City IN ('Toronto','New Delhi')
EXCEPT
SELECT employeeName, employeeSurname FROM employees
WHERE City='Toronto'

Pyspark

df = spark.table("employees") 
df1 = df.select(col("employeeName"), col("employeeSurname")).filter(df["City"].isin(["Toronto","New Delhi"]))
df2 = df.select(col("employeeName"), col("employeeSurname")).filter(df["City"].isin(["Toronto"]))
df = df1.exceptAll(df2)
df.show()

Window functions – ROW NUMER

The ROW_NUMBER() function generates a unique row number for each row in the result set.

SQL

SELECT ROW_NUMBER() OVER(ORDER BY Salary DESC) AS RN,* FROM Employees

Pyspark

from pyspark.sql.functions import col, row_number 
from pyspark.sql import Window

df = spark.table("employees") 

windowSpec = Window.orderBy(col("Salary").desc())
df = df.withColumn("RN", row_number().over(windowSpec))
df.show()

Windows functions – PARTITION BY

This is a SQL query that assigns a unique row number to each row in the Employees table, based on the order of the Salary column in descending order, but only within each partition defined by the Surname column.

SQL

SELECT ROW_NUMBER() OVER(PARTITION BY Surname ORDER BY Salary DESC) AS RN,* FROM Employees

Pyspark

from pyspark.sql.functions import col, row_number 
from pyspark.sql import Window

df = spark.table("employees") 

windowSpec = Window.partitionBy(col("employeeSurname")).orderBy(col("Salary").desc())
df = df.withColumn("RN", row_number().over(windowSpec))
df.show()

Window functions – Aggregate + OVER

This is a query calculates the total salary for each unique surname in the Employees table, and returns the surname, salary, and total salary for each employee. The query uses the SUM function with the OVER clause to calculate the total salary for each unique surname. The PARTITION BY clause in the OVER clause specifies that the total salary should be calculated separately for each unique surname, so the SUM function only adds up the salaries for employees with the same surname.

SQL

SELECT employeeSurname,Salary,SUM(Salary) OVER(PARTITION BY employeeSurname) AS TotalSalaryBySurname FROM Employees

Pyspark

from pyspark.sql.functions import col, row_number, sum
from pyspark.sql import Window

df = spark.table("employees") 

windowSpec = Window.partitionBy(col("employeeSurname"))
df = df.withColumn("TotalSalaryBySurname", sum(df["Salary"]).over(windowSpec))
df = df.select(["employeeSurname","Salary","TotalSalaryBySurname"])
df.show()

Window functions – LAG & LEAD

The query uses the LAG and LEAD functions with the OVER clause to calculate the previous and next day’s Quantity values for each day, based on the order of the Date column.

SQL

SELECT Quantity, LAG(Quantity) OVER(ORDER BY Date) AS PreviousDayQuantity, Lead(Quantity) OVER(ORDER BY Date) AS NextDayQuantity 
FROM 
(
SELECT Date,SUM(Quantity) AS Quantity
FROM
Sales
) AS t

Pyspark

from pyspark.sql.functions import lag, lead 
df = spark.table("sales") 
df = df.groupBy("Date").agg(sum(df["Quantity"]).alias("Quantity"))
df = df.select( df["Date"],df["Quantity"], lag("Quantity").over(Window.orderBy("Date")).alias("PreviousDayQuantity"), lead("Quantity").over(Window.orderBy("Date")).alias("NextDayQuantity") )
df.show()

Subqueries – Common Table Expression

This is a query that creates a Common Table Expression (CTE) named cte which is used to retrieve the top 5 employees with the highest salaries from the Employees table, sorted in descending order by salary.

SQL

with cte
AS
(select TOP 5 * FROM Employees ORDER BY Salary DESC)
SELECT * FROM cte

Pyspark

from pyspark.sql.functions import desc 
df = spark.table("employees") 
cte = df.select("*").sort(desc("Salary")).limit(5)
cte.show()

Converting – CAST

This is a query retrieves the employeeId, birthdate, and a new column AgeText from the Employees table.

The AgeText column is created using the CAST function which is used to convert the birthdate column from its original data type to a string data type with a maximum length of 50 characters. The resulting AgeText column will contain the string representation of the birthdate values.

SQL

SELECT employeeId,birthdate,CAST(birthdateAS VARCHAR(50)) AS AgeText
FROM Employees

Pyspark

from pyspark.sql.functions import cast

df = spark.table("employees") 
df = df.withColumn("birthdateText",df["birthdate"].cast("string") )
df = df.select(["employeeId","birthdate","birthdateText"])
df.show()

 

 

In conclusion, it is my hope that the information shared in this article regarding PySpark and its comparison to SQL will be beneficial to your learning journey. I anticipate providing further insights on related topics, including PySpark, Pandas, and Spark, in the future. I invite you to stay tuned for updates.

2 Comments

Leave a Reply