DatabricksForeignCatalogQueryFolding_00

Avoiding Issues: Monitoring Query Pushdowns in Databricks Federated Queries

A foreign catalog in Databricks is a specialized type of catalog that enables users to access and query data stored in external databases as if it were part of their own Databricks workspace. Currently, foreign catalogs can be created for multiple sources, including SQL Server, Synapse Analytics, and more. This feature is particularly valuable as it allows you to extend your solution by executing federated queries on data that exists in other systems. However, there is one important aspect to be aware of: query pushdown. In most cases, this feature functions as intended, with Databricks generating SQL queries and sending them to the source system. Nevertheless, there are specific scenarios where you may find that the query is executed by Databricks on the entire dataset extracted from the source system. Let’s explore how this works.

Create Foreign Catalog

Before we test query pushdown, let’s create a Federated Catalog. First, we need to add a connection. This option is available through the graphical interface, but you can also use code if you prefer. For simplicity, we will use the GUI today. The screenshots below depict the entire process. One important point to mention is that the current connection to SQL Server supports both authentication methods: SQL Server authentication (username and password) and, more importantly, OAuth, where we can provide a Service Principal that will be used for executing queries. While it would be beneficial to have support for credential passthrough, the existing options are more than sufficient for most purposes, especially considering that only SQL authentication was supported in the past. Of course, a connection is just another object available within the Unity Catalog, and we can assign permissions to it just like we do for any other objects.

When the connection is ready, we can add a catalog. However, instead of selecting the standard type, we must choose “Foreign.” The entire process is quite straightforward, and the only details we need to provide are the name of the catalog that will be visible on the Databricks side and the database to which we want to connect.

After completing this entire process, we should see all the objects from the connected SQL Server displayed in a manner similar to that of standard catalogs. However, we will only see those objects to which the user defined in the connection has access.

Query pushdown test

Let’s begin our test. We will use PySpark with the spark.sql function, but the effect will be the same if you use Spark SQL directly on the foreign catalog. The query below is our first query, which selects all columns from the dbo.invoices table. We then filter the results to include only those rows where invoice_id is equal to 3. Finally, we display the results to execute our query.

df = spark.sql("select * FROM `remote-sql`.dbo.invoices")
df.filter(df.invoice_id==3).show()

To verify what was sent to the SQL Server, you can switch to the Spark Jobs view. Under the SQL/DataFrame section, you will find your executed query. This feature allows you to monitor the queries processed by Spark, providing insights into the interactions between Spark and the SQL Server:

When you view the details of the query you want to analyze, you should open the Details section and look for information available under External Engine Query – this is the native T-SQL query sent to SQL Server. Of course, from this view, you can find much more information that may be useful to you.As you can see in the screenshot below, the query from PySpark was successfully converted into T-SQL, and all necessary transformations were executed by the SQL Server itself:

We can prepare many more queries like this. To test additional functionality, I added a projection, which means that we select only specific columns and then filter the rows:

df = spark.sql("select * FROM `remote-sql`.dbo.invoices")
df.select("invoice_id", "customer_name").filter(df.invoice_id==3).show()

Above query was translated properly as you can see below:

External engine query: SELECT TOP (21) "invoice_id","customer_name" FROM "dbo"."invoices"  WHERE ("invoice_id" IS NOT NULL) AND ("invoice_id" = 3)

This is a simple example, but you can expect that in most cases, the following operations should be mapped correctly: filters, projections, limits, and some functions (those used for filter expressions and other general functions such as aliases, casts, and sort orders). Additionally, almost all aggregates, Boolean operators, mathematical functions, etc., are supported. Thus, the library of supported constructs is quite extensive; however, not everything is supported. The first and most important construct that will not be translated is any kind of join, regardless of how you write it. Below you can find simple example of it:

df = spark.sql("select * FROM `remote-sql`.dbo.invoices AS i join `remote-sql`.dbo.invoice_line_items as il ON i.invoice_id = il.invoice_id")
df.show()

When we examine the Spark query plan, we can see that there are numerous operators. This indicates that many operations are being performed by the Spark engine itself, rather than by SQL Server.

In the details section you can find that Databricks sent two separate queries to SQL Server, download all the data and perform join on its own side.

External engine query: SELECT  "invoice_id","customer_name","date_issued","due_date","total_amount","status" FROM "dbo"."invoices"  WHERE ("invoice_id" IS NOT NULL) 

External engine query: SELECT  "line_item_id","invoice_id","description","quantity","unit_price","line_total" FROM "dbo"."invoice_line_items"  WHERE ("invoice_id" IS NOT NULL)

As you can imagine, this is not an issue for small datasets, but what happens when those tables contain tens or even hundreds of millions of rows? In such cases, we may encounter problems. This situation can arise in various scenarios, especially with joins and window functions, so it is essential to test your queries to ensure they behave as expected. What should we do if we need to perform joins or any other constructs that are not pushed down? Consider using a standard JDBC connection, which allows you to send any query. However, the downside of this approach is that it is not as seamlessly integrated as it is with the Foreign Catalog.

Summary

In this article, I described the process of querying a SQL Server using PySpark within a Federated Catalog environment. It highlights the importance of monitoring the queries sent to SQL Server to ensure that all transformations are executed correctly. I emphasized that while many operations such as filters, projections, and aggregates – are supported and mapped correctly, certain constructs, specifically joins, are not translated. If you take away one key point from this text, it should be that performance issues may arise with large datasets. Additionally, please remember the option of using a standard JDBC connection for queries that cannot be pushed down. Please remember also that all the described examples are working for SQL Server, for other types of Foreign Catalog please check documentation because they can have different limitations. Thank you for your time!

More information: documentation.

Leave a Reply