I think I should not convince you to learn PySpark. There is multiple reasons why we should do that, first of all PySpark is one of the most widely used big data processing frameworks, providing support for large-scale data processing using the Apache Spark framework that currently dominated Big Data world. How to learn it? The simplest way will be to do it by analogy to something that we already know. Today I would like to share with you different kind of article – In the past I have developed a comprehensive cheatsheet to assist myself with the mastery of PySpark. In my journey to become proficient in Spark, I initially leveraged my familiarity with SQL to facilitate my understanding of data frames in PySpark. This approach proved to be an effective method for gaining a solid understanding of the PySpark library.
To start let’s say what PySpark is. PySpark is the Python API for Apache Spark, an open-source, distributed computing system used for big data processing and analysis. It allows developers to process large amounts of data in a parallel, fast, and efficient manner using Python. Please assume that in some cases it cannot be translated directly, Spark has a concept of a table but in reality this is just metastore pointers to the storage.
To properly check below pyspark structures you can execute on your cluster following queries:
CREATE TABLE employees ( employeeId INT, employeeName STRING, employeeSurname STRING, employeeTitle STRING, age INT, city STRING, birthdate DATE, salary DECIMAL ) CREATE OR REPLACE TABLE sales ( EmployeeId INT, Quantity DECIMAL, UnitPrice DECIMAL, City STRING, Date DATE ) INSERT INTO employees (employeeId,employeeName, employeeSurname, employeeTitle, age, city, birthdate, salary) VALUES (1,"John", "Doe", "Manager", 35, "New York", "1986-05-10", 75000), (2,"Jane", "Doe", "Developer", 32, "London", "1989-02-15", 65000), (3,"Jim", "Smith", "Architect", 40, "Paris", "1980-08-20", 85000), (4,"Sarah", "Johnson", "Designer", 29, "Berlin", "1992-12-01", 55000), (5,"Michael", "Brown", "Product Manager", 38, "Tokyo", "1984-06-06", 75000), (6,"Emily", "Davis", "Data Analyst", 31, "Sydney", "1990-09-12", 65000), (7,"David", "Wilson", "Salesperson", 33, "Toronto", "1988-07-01", 55000), (8,"William", "Johnson", "Support Engineer", 36, "Beijing", "1985-04-01", 65000), (9,"Brian", "Anderson", "Marketing Manager", 37, "Shanghai", "1983-05-15", 75000), (10,"James", "Lee", "Operations Manager", 39, "Seoul", "1981-03-01", 85000), (11,"Emily", "Parker", "HR Manager", 30, "Dubai", "1991-12-25", 75000), (12,"Jacob", "Williams", "Accountant", 34, "New Delhi", "1987-06-01", 65000); INSERT INTO sales (EmployeeId, Quantity, UnitPrice, City, Date) VALUES (1, 10, 20, 'New York',"2023-03-01"); INSERT INTO sales (EmployeeId, Quantity, UnitPrice, City, Date) VALUES (1, 5, 10, 'London',"2023-03-01"); INSERT INTO sales (EmployeeId, Quantity, UnitPrice, City, Date) VALUES (3, 15, 15, 'Paris',"2023-03-02"); INSERT INTO sales (EmployeeId, Quantity, UnitPrice, City, Date) VALUES (4, 20, 30, 'Berlin',"2023-03-02"); INSERT INTO sales (EmployeeId, Quantity, UnitPrice, City, Date) VALUES (4, 25, 10, 'Tokyo',"2023-03-03"); INSERT INTO sales (EmployeeId, Quantity, UnitPrice, City, Date) VALUES (4, 30, 20, 'Sydney',"2023-03-03"); INSERT INTO sales (EmployeeId, Quantity, UnitPrice, City, Date) VALUES (5, 35, 25, 'Beijing',"2023-03-03"); INSERT INTO sales (EmployeeId, Quantity, UnitPrice, City, Date) VALUES (6, 40, 30, 'Shanghai',"2023-03-04"); INSERT INTO sales (EmployeeId, Quantity, UnitPrice, City, Date) VALUES (7, 45, 35, 'New Delhi',"2023-03-05"); INSERT INTO sales (EmployeeId, Quantity, UnitPrice, City, Date) VALUES (7, 50, 40, 'New Delhi',"2023-03-05"); INSERT INTO sales (EmployeeId, Quantity, UnitPrice, City, Date) VALUES (9, 55, 45, 'Seoul',"2023-03-05"); INSERT INTO sales (EmployeeId, Quantity, UnitPrice, City, Date) VALUES (10, 60, 50, 'Rio de Janeiro',"2023-03-05"); INSERT INTO sales (EmployeeId, Quantity, UnitPrice, City, Date) VALUES (10, 65, 55, 'Paris',"2023-03-06"); INSERT INTO sales (EmployeeId, Quantity, UnitPrice, City, Date) VALUES (NULL, 70, 60, 'Mexico City',"2023-03-06"); INSERT INTO sales (EmployeeId, Quantity, UnitPrice, City, Date) VALUES (NULL, 75, 65, 'Mumbai',"2023-03-08");
Feel free to adjust it to your needs.
SELECT all columns from table
The statement is used to retrieve all columns and rows of data from the table.
SQL:
SELECT * FROM employees
Python:
df = spark.table("employees") df.show()
Select specific columns from table
This query retrieves the values of the columns “employeeName,” “employeeSurname,” and “employeeTitle” from the “employees” table.
SQL:
SELECT employeeName,employeeSurname,employeeTitle FROM employees
Pyspark:
df = spark.table("employees") df = df.select("employeeName", "employeeSurname", "employeeTitle") df.show()
Order result
SELECT employeeName,employeeSurname,employeeTitle FROM employees ORDER BY employeeSurname
SQL:
SELECT employeeName,employeeSurname,employeeTitle FROM employees ORDER BY employeeSurname
Pyspark:
df = spark.table("employees") df = df.select("employeeName", "employeeSurname", "employeeTitle") df = df.orderBy("employeeSurname") df.show()
Order result descending
This is a SQL query that retrieves the values of the columns “employeeName”, “employeeSurname”, and “employeeTitle” from a table called “employees”. The results will be ordered in ascending order based on the values in the “employeeSurname” column.
SQL:
SELECT employeeName,employeeSurname,employeeTitle FROM employees ORDER BY employeeSurname DESC
Pyspark:
df = spark.table("employees") df = df.select("employeeName", "employeeSurname", "employeeTitle") df = df.orderBy(df["employeeSurname"].desc()) df.show()
Select TOP N rows
The query retrieves the “employeeName”, “employeeSurname”, and “employeeTitle” columns from the “employees” table. It limits the number of returned rows to 10, with the first 10 rows being determined by the sorting order specified in the “ORDER BY” clause, which sorts the results by the “employeeSurname” column. This means that the query will return the first 10 employees from the table sorted by their surname.
SQL:
SELECT TOP 10 employeeName,employeeSurname,employeeTitle FROM employees ORDER BY employeeSurname
Pyspark:
df = spark.table("employees") df = df.select("employeeName", "employeeSurname", "employeeTitle") df = df.orderBy("employeeSurname") df = df.limit(10) df.show()
Adding aliases
This query retrieves data from the “employees” table and selects the columns “employeeName”, “employeeSurname”, and “employeeTitle”. The “AS” clause is used to assign new names, or aliases, to each of the selected columns.
SQL:
SELECT employeeName AS Name ,employeeSurname AS Surname,employeeTitle AS Title FROM employees
Pyspark:
df = spark.table("employees") df = df.withColumnRenamed("employeeName", "Name") df = df.withColumnRenamed("employeeSurname", "Surname") df = df.withColumnRenamed("employeeTitle", "Title") df.show()
Filtering – greater than
his query retrieves data from the “employees” table and selects all columns by using the wildcard selector “*”. The “WHERE” clause is used to filter the results based on a condition. In this case, the condition is “Age > 35”, which means that only rows where the value in the “Age” column is greater than 35 will be returned.
SQL:
SELECT * FROM employees WHERE Age>35
Pyspark:
df = spark.table("employees") df = df.filter(df["Age"] > 35) df.show()
Filtering – logical AND
The “WHERE” clause is used to filter the results based on multiple conditions. In this case, the conditions are “Age > 18” and “City = ‘Warsaw'”. These conditions are combined using the logical operator “AND”, meaning that only rows where both conditions are true will be returned.
SQL:
SELECT * FROM employees WHERE Age>35 AND City = 'Seoul'
Pyspark:
df = spark.table("employees") df = df.filter((df["Age"] > 35) & (df["City"] == "Seoul")) df.show()
Filtering – logical OR
The “WHERE” clause is used to filter the results based on multiple conditions. In this case, the conditions are “Age > 18” and “City = ‘Warsaw'”. These conditions are combined using the logical operator “OR”, meaning that rows where either one or both conditions are true will be returned.
SQL:
SELECT * FROM employees WHERE Age>35 OR City = 'Seoul'
Pyspark:
df = spark.table("employees") df = df.filter((df["Age"] > 35) | (df["City"] == "Seoul")) df.show()
Filtering – wildcard
The “WHERE” clause is used to filter the results based on a condition using a pattern match. In this case, the condition is “City LIKE ‘W%'”, which means that only rows where the value in the “City” column starts with the letter “W” will be returned. The “%” symbol is used as a wildcard character, representing zero or more characters.
SQL:
SELECT * FROM employees WHERE City LIKE 'S%'
Pyspark:
df = spark.table("employees") df = df.filter(df["City"].like("S%")) df.show()
Filtering – BETWEEN
The “WHERE” clause is used to filter the results based on a range of values. In this case, the range is defined using the “BETWEEN” operator, with the values ‘19900101’ and ‘20000101’ as the lower and upper bounds, respectively. Only rows where the value in the “BirthDate” column is within this range will be returned.
SQL:
SELECT * FROM employees WHERE BirthDate BETWEEN '19900101' AND '20000101'
Pyspark:
df = spark.table("employees") df = df.filter(df["BirthDate"].between("1990-01-01", "2000-01-01")) df.show()
Filtering – not equal
The “WHERE” clause is used to filter the results based on a condition. In this case, the condition is “City <> ‘Warsaw'”, which means that only rows where the value in the “City” column is not equal to ‘Warsaw’ will be returned
SQL:
SELECT * FROM employees WHERE City <> 'Warsaw'
Pyspark:
df = spark.read.table("employees") df.filter(df["City"] != 'Toronto').select("*").show()
Filtering – IN
The “WHERE” clause is used to filter the results based on a condition. In this case, the condition is “City IN (‘Warsaw’,’Krakow’,’Lublin’)”, which means that only rows where the value in the “City” column is equal to one of the cities listed in the parentheses will be returned. The “IN” operator is used to specify a set of values to check for equality.
SQL:
SELECT * FROM employees WHERE City IN ('Warsaw','Krakow','Lublin')
Pyspark:
df = spark.read.table("employees") df = df.filter(df["City"].isin(['Tokyo', 'London', 'New York'])) df.show()
Filtering – NOT IN
The “WHERE” clause is used to filter the results based on a condition. In this case, the condition is “City NOT IN (‘Warsaw’,’Krakow’,’Lublin’)”, which means that only rows where the value in the “City” column is not equal to any of the cities listed in the parentheses will be returned. The “NOT IN” operator is used to specify a set of values to check for inequality.
SQL:
SELECT * FROM employees WHERE City NOT IN ('Warsaw','Krakow','Lublin')
Pyspark:
df = spark.read.table("employees") df = df.filter(~df["City"].isin(['Tokyo', 'London', 'New York'])) df.show()
Aggregate – COUNT
This query retrieves data from the “employees” table and selects the “City” column and a count of the number of rows for each city.
SQL:
SELECT City,COUNT(*) FROM employees GROUP BY City
Pyspark:
from pyspark.sql.functions import count df = spark.read.table("employees") df = df.groupBy("City").agg(count("*").alias("Count")).select("City", "Count") df.show()
Aggregate – MIN,MAX,AVG,SUM
This query retrieves data from the “employees” table and selects the “City” column and various aggregate functions applied to the “Salary” column for each city.
SQL:
SELECT City,AVG(Salary),MIN(Salary),MAX(Salary),SUM(Salary) FROM employees GROUP BY City
Pyspark:
from pyspark.sql.functions import avg, min, max, sum df = spark.read.table("employees") df = df.groupBy("City").agg(avg("Salary").alias("avg_salary"), min("Salary").alias("min_salary"), max("Salary").alias("max_salary"), sum("Salary").alias("total_salary")) df.show()
Aggregate – HAVING
This query retrieves data from the “employees” table and selects the “City” column and the average salary of employees in each city. The “GROUP BY” clause is used to group the rows in the “employees” table by the “City” column. The “AVG(Salary)” function is used to calculate the average salary of employees in each city. The “HAVING” clause is used to further filter the groups based on the aggregate result. In this case, the condition is “Salary>70000”, which means that only groups where the average salary of employees is greater than 70000 will be returned.
SQL:
SELECT City,AVG(Salary)AS Salary FROM employees GROUP BY City HAVING Salary>1000
Pyspark:
from pyspark.sql.functions import avg df = spark.read.table("employees") df = df.groupBy("City") .agg(avg(col("Salary").cast("double")).alias("Salary")).filter(col("Salary") > 70000) .select("City", "Salary") df.show()
SELECT DISTINCT
The “SELECT DISTINCT” statement is used to return only unique values from the “City” column. This means that if there are multiple employees with the same city, the query will only return one instance of that city.
SQL:
SELECT DISTINCT City FROM employees
Pyspark:
df = spark.read.table("employees") df = df.select("City").distinct() df.show()
Calculated column
This query performs a simple calculation to add a new column to a dataframe. The calculation multiplies the values of two existing columns in the dataframe (named “OrderQty” and “UnitPrice”) to create a new column named “SalesAmount”.
SQL
SELECT Quantity * UnitPrice AS SalesAmount From Sales
Pyspark:
df = spark.read.table("sales") df = df.withColumn("SalesAmount", df["Quantity"] * df["UnitPrice"]) df.show()e"))
to replace existing column use third parameter overwrite=True
SUBSTRING, LEFT, RIGHT
This query retrieves data from the “employees” table and selects three columns, each using a different string function: “LEFT”, “RIGHT”, and “SUBSTRING”.”LEFT” function returns the first 2 characters of the city name. “RIGHT” function returns the last 2 characters of the city name. “SUBSTRING” function returns the 2 characters of the city name from third character.
SQL:
SELECT LEFT(City,2),RIGHT(City,2),SUBSTRING(City,1,2) FROM Employees
Pyspark:
from pyspark.sql.functions import substring df = spark.read.table("sales") df = df.select( df["City"],substring(df["City"], 1, 2).alias("left"), substring(df["City"], -2, 2).alias("right"), substring(df["City"], 3, 2).alias("substring") ) df.show()
Concatenation
The “CONCAT” function concatenates (joins) two or more strings into a single string. In this case, the first argument to the “CONCAT” function is the string “Employee: “, and the second argument is the concatenation of the “FirstName” and “LastName” columns, separated by a space.
SQL:
SELECT CONCAT('Employee: ',employeeName+ ' '+employeeSurname) FROM Employees
Pyspark
from pyspark.sql.functions import lit,concat df = spark.read.table("employees") df = df.select(concat(lit("Employee: "), df["employeeName"], lit(" "), df["employeeSurname"]).alias("full_name")) df.show()
Filtering – NOT IN and subquery
This query retrieves data from the “employees” table and selects all rows where the city is not present in the “City” column of the “Sales” table.
SQL
SELECT * FROM Employees WHERE City NOT IN (SELECT City FROM Sales)
Pyspark
df1 = spark.read.table("employees") df2 = spark.read.table("sales") df = df1.join(df2, "City", "left_anti") df.show()
Subquery with filtering
This query retrieves data from the “employees” table, creates a subquery that groups the employees by city and counts the number of employees in each city, and then filters the result to only show cities with fewer than 2 employees.
SQL
SELECT City ( SELECT City,COUNT(*) AS Cnt FROM employees GROUP BY City ) AS t WHERE Cnt<2
Pyspark
from pyspark.sql.functions import count df = spark.read.table("employees") df = df.select("City").join(df.groupBy("City").agg(count("*").alias("cnt")), "City").where("cnt < 2") df.show()
JOIN – Inner join
This query performs a join between the “employees” and “Sales” tables, which combines data from both tables based on the “employeeId” column.
SQL
SELECT e.City, s.(Quantity) AS Qty FROM employees AS e JOIN Sales AS s ON s.employeeId = e.employeeId
Python
df1 = spark.read.table("employees") df2 = spark.read.table("sales") df= df1.join(df2,df1["employeeId"] == df2["employeeId"]).select(df1["City"], df2["Quantity"].alias("Qty")) df.show()
JOIN – Left join
This query performs a left join between the “employees” and “Sales” tables, which combines data from both tables based on the “employeeId” column. The left join returns all rows from the “employees” table and only matching rows from the “Sales” table. If there is no matching data in the “Sales” table for a particular employee, NULL values will be displayed for the “Quantity” column.
SQL
SELECT e.City, s.(Quantity) AS Qty FROM employees AS e LEFT JOIN Sales AS s ON s.employeeId = e.employeeId
Pyspark
df1 = spark.read.table("employees") df2 = spark.read.table("sales") df= df1.join(df2,df1["employeeId"] == df2["employeeId"],"left").select(df1["City"], df2["Quantity"].alias("Qty")) df.show()
JOIN – right join
This query performs a right join between the “employees” and “Sales” tables, which combines data from both tables based on the “employeeId” column. The right join returns all rows from the “Sales” table and only matching rows from the “employees” table. If there is no matching data in the “employees” table for a particular sale, NULL values will be displayed for the “City” column.
SQL
SELECT e.City, s.Quantity AS Qty FROM employees AS e RIGHT JOIN Sales AS s ON s.employeeId = e.employeeId
Pyspark
df1 = spark.read.table("employees") df2 = spark.read.table("sales") df= df1.join(df2,df1["employeeId"] == df2["employeeId"],"right").select(df1["City"], df2["Quantity"].alias("Qty")) df.show()
JOIN – full join
The query performs a full join between the “employees” table and the “Sales” table, using the “employeeId” column as the join condition. The result will return all rows from both tables, including matching and non-matching rows. For non-matching rows, the values from the other table will be filled with NULL.
SQL
SELECT e.City, s.Quantity AS Qty FROM employees AS e FULL JOIN Sales AS s ON s.employeeId = e.employeeId
Pyspark
df1 = spark.read.table("employees") df2 = spark.read.table("sales") df= df1.join(df2,df1["employeeId"] == df2["employeeId"],"full").select(df1["City"], df2["Quantity"].alias("Qty")) df.show()
JOIN – Cross join
This query that performs a cross join between two tables, and returns all columns from both tables. The cross join results in all possible combinations of rows from both tables.
SQL
SELECT * FROM employees AS e CROSS JOIN Sales AS s
Pyspark
df1 = spark.table("employees") df2 = spark.table("sales") df1.crossJoin(df2)
Working on sets – UNION
This is a query that retrieves the employeeName and employeeSurname columns from the employees table, where the value of the City column is either ‘Toronto’ or ‘New Delhi’. The UNION operation is used to combine two datasets into a single dataset. The result will contain only unique rows, with duplicates removed.
SQL
SELECT employeeName, EmployeeSurname FROM employees WHERE City='Toronto' UNION SELECT employeeName, employeeSurname FROM employees WHERE City='New Delhi' UNION SELECT employeeName, employeeSurname FROM employees WHERE City='New Delhi'
Python
from pyspark.sql.functions import col df = spark.table("employees") df1 = df.select(col("employeeName"), col("employeeSurname")).filter(col("City") == "Toronto") df2 = df.select(col("employeeName"), col("employeeSurname")).filter(col("City") == "New Delhi") df3 = df.select(col("employeeName"), col("employeeSurname")).filter(col("City") == "New Delhi") df = df1.union(df2).union(df3).distinct() df.show()
Working on sets – UNION ALL
This is a query retrieves the employeeName and employeeSurname columns from the employees table, where the value of the City column is either ‘Toronto’ or ‘New Delhi’. The UNION ALL keyword is used to combine the results of the three separate datasets into a single dataset. Unlike UNION, UNION ALL does not remove duplicates, so the result will contain all rows from all three datasets.
SQL
SELECT employeeName, employeeSurname FROM employees WHERE City='Toronto' UNION ALL SELECT employeeName, employeeSurname FROM employees WHERE City='New Delhi' UNION ALL SELECT employeeName, employeeSurname FROM employees WHERE City='New Delhi'
Pyspark
from pyspark.sql.functions import col df = spark.table("employees") df1 = df.select(col("employeeName"), col("employeeSurname")).filter(col("City") == "Toronto") df2 = df.select(col("employeeName"), col("employeeSurname")).filter(col("City") == "New Delhi") df3 = df.select(col("employeeName"), col("employeeSurname")).filter(col("City") == "New Delhi") df = df1.union(df2).union(df3) df.show()
Working on sets – INTERSECT
This is a query retrieves the employeeName and employeeSurname columns from the employees where the value of the City column is either ‘Toronto’ or ‘New Delhi’. The INTERSECT keyword is used to return only the rows that are common to both datasets. In other words, the result will contain only the rows where the City column has the value ‘Toronto’ in both datasets.
SQL
SELECT employeeName, employeeSurname FROM employees WHERE City IN ('New Delhi, Toronto') INTERSECT SELECT employeeName, employeeSurname FROM employees WHERE City='Toronto'
Pyspark
df = spark.table("employees") df1 = df.select(col("employeeName"), col("employeeSurname")).filter(df["City"].isin(["Toronto","New Delhi"])) df2 = df.select(col("employeeName"), col("employeeSurname")).filter(df["City"].isin(["Toronto"])) df = df1.intersect(df2) df.show()
Working on sets – EXCEPT
This is a query that retrieves the employeeName and employeeSurname columns from the employees table, where the value of the City column is either ‘Toronto’ or ‘New Delhi’.The EXCEPT operation is used to return the rows that are in the first dataset but not in the second . In this case, the first dataset retrieves the rows where the City column is either ‘Toronto’ or ‘New Delhi’, while the second SELECT statement retrieves the rows where the City column has the value ‘Toronto’.
SQL
SELECT employeeName, employeeSurname FROM employees WHERE City IN ('Toronto','New Delhi') EXCEPT SELECT employeeName, employeeSurname FROM employees WHERE City='Toronto'
Pyspark
df = spark.table("employees") df1 = df.select(col("employeeName"), col("employeeSurname")).filter(df["City"].isin(["Toronto","New Delhi"])) df2 = df.select(col("employeeName"), col("employeeSurname")).filter(df["City"].isin(["Toronto"])) df = df1.exceptAll(df2) df.show()
Window functions – ROW NUMER
The ROW_NUMBER() function generates a unique row number for each row in the result set.
SQL
SELECT ROW_NUMBER() OVER(ORDER BY Salary DESC) AS RN,* FROM Employees
Pyspark
from pyspark.sql.functions import col, row_number from pyspark.sql import Window df = spark.table("employees") windowSpec = Window.orderBy(col("Salary").desc()) df = df.withColumn("RN", row_number().over(windowSpec)) df.show()
Windows functions – PARTITION BY
This is a SQL query that assigns a unique row number to each row in the Employees table, based on the order of the Salary column in descending order, but only within each partition defined by the Surname column.
SQL
SELECT ROW_NUMBER() OVER(PARTITION BY Surname ORDER BY Salary DESC) AS RN,* FROM Employees
Pyspark
from pyspark.sql.functions import col, row_number from pyspark.sql import Window df = spark.table("employees") windowSpec = Window.partitionBy(col("employeeSurname")).orderBy(col("Salary").desc()) df = df.withColumn("RN", row_number().over(windowSpec)) df.show()
Window functions – Aggregate + OVER
This is a query calculates the total salary for each unique surname in the Employees table, and returns the surname, salary, and total salary for each employee. The query uses the SUM function with the OVER clause to calculate the total salary for each unique surname. The PARTITION BY clause in the OVER clause specifies that the total salary should be calculated separately for each unique surname, so the SUM function only adds up the salaries for employees with the same surname.
SQL
SELECT employeeSurname,Salary,SUM(Salary) OVER(PARTITION BY employeeSurname) AS TotalSalaryBySurname FROM Employees
Pyspark
from pyspark.sql.functions import col, row_number, sum from pyspark.sql import Window df = spark.table("employees") windowSpec = Window.partitionBy(col("employeeSurname")) df = df.withColumn("TotalSalaryBySurname", sum(df["Salary"]).over(windowSpec)) df = df.select(["employeeSurname","Salary","TotalSalaryBySurname"]) df.show()
Window functions – LAG & LEAD
The query uses the LAG and LEAD functions with the OVER clause to calculate the previous and next day’s Quantity values for each day, based on the order of the Date column.
SQL
SELECT Quantity, LAG(Quantity) OVER(ORDER BY Date) AS PreviousDayQuantity, Lead(Quantity) OVER(ORDER BY Date) AS NextDayQuantity FROM ( SELECT Date,SUM(Quantity) AS Quantity FROM Sales ) AS t
Pyspark
from pyspark.sql.functions import lag, lead df = spark.table("sales") df = df.groupBy("Date").agg(sum(df["Quantity"]).alias("Quantity")) df = df.select( df["Date"],df["Quantity"], lag("Quantity").over(Window.orderBy("Date")).alias("PreviousDayQuantity"), lead("Quantity").over(Window.orderBy("Date")).alias("NextDayQuantity") ) df.show()
Subqueries – Common Table Expression
This is a query that creates a Common Table Expression (CTE) named cte which is used to retrieve the top 5 employees with the highest salaries from the Employees table, sorted in descending order by salary.
SQL
with cte AS (select TOP 5 * FROM Employees ORDER BY Salary DESC) SELECT * FROM cte
Pyspark
from pyspark.sql.functions import desc df = spark.table("employees") cte = df.select("*").sort(desc("Salary")).limit(5) cte.show()
Converting – CAST
This is a query retrieves the employeeId, birthdate, and a new column AgeText from the Employees table.
The AgeText column is created using the CAST function which is used to convert the birthdate column from its original data type to a string data type with a maximum length of 50 characters. The resulting AgeText column will contain the string representation of the birthdate values.
SQL
SELECT employeeId,birthdate,CAST(birthdateAS VARCHAR(50)) AS AgeText FROM Employees
Pyspark
from pyspark.sql.functions import cast df = spark.table("employees") df = df.withColumn("birthdateText",df["birthdate"].cast("string") ) df = df.select(["employeeId","birthdate","birthdateText"]) df.show()
In conclusion, it is my hope that the information shared in this article regarding PySpark and its comparison to SQL will be beneficial to your learning journey. I anticipate providing further insights on related topics, including PySpark, Pandas, and Spark, in the future. I invite you to stay tuned for updates.
- Avoiding Issues: Monitoring Query Pushdowns in Databricks Federated Queries - October 27, 2024
- Microsoft Fabric: Using Workspace Identity for Authentication - September 25, 2024
- Executing SQL queries from Azure DevOps using Service Connection credentials - August 28, 2024
Wow. Nice1. Tnx
Thanks Michał!