{ "cells": [ { "cell_type": "code", "execution_count": 2, "id": "97d93ac1-1ba3-40c1-8ea7-901b691ac2be", "metadata": { "editable": true, "slideshow": { "slide_type": "" }, "tags": [ "remove-cell" ] }, "outputs": [], "source": [ "import warnings\n", "warnings.filterwarnings('ignore')\n", "\n", "from pyspark.sql import SparkSession\n", "\n", "spark = SparkSession.builder.getOrCreate()\n", "spark.sparkContext.setLogLevel(\"error\")" ] }, { "attachments": {}, "cell_type": "markdown", "id": "cf68f672-caae-483b-aea8-a44ec22a7bdf", "metadata": { "editable": true, "slideshow": { "slide_type": "" }, "tags": [] }, "source": [ "# Chapter 6: Old SQL, New Tricks - Running SQL on PySpark" ] }, { "attachments": {}, "cell_type": "markdown", "id": "ecba6109-c9d9-4922-a13d-6256a3f74c3a", "metadata": { "editable": true, "slideshow": { "slide_type": "" }, "tags": [] }, "source": [ "## Introduction" ] }, { "attachments": {}, "cell_type": "markdown", "id": "ca18176e-9457-447a-aa16-365204b7214e", "metadata": { "editable": true, "slideshow": { "slide_type": "" }, "tags": [] }, "source": [ "This section explains how to use the Spark SQL API in PySpark and compare it with the DataFrame API. It also covers how to switch between the two APIs seamlessly, along with some practical tips and tricks." ] }, { "attachments": {}, "cell_type": "markdown", "id": "60b59fe5-4d05-408b-8807-20f863efc6e4", "metadata": { "editable": true, "slideshow": { "slide_type": "" }, "tags": [] }, "source": [ "## Running SQL with PySpark\n", "PySpark offers two main ways to perform SQL operations:" ] }, { "attachments": {}, "cell_type": "markdown", "id": "fd138d2d-480f-4a50-bff8-9f583f39d9a1", "metadata": { "editable": true, "slideshow": { "slide_type": "" }, "tags": [] }, "source": [ "### Using `spark.sql()`\n", "The `spark.sql()` function allows you to execute SQL queries directly." ] }, { "cell_type": "code", "execution_count": 10, "id": "2e2a56b2-226a-4626-84b7-f0ff93e008c6", "metadata": { "editable": true, "slideshow": { "slide_type": "" }, "tags": [] }, "outputs": [ { "data": { "text/plain": [ "DataFrame[]" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Create a table via spark.sql()\n", "spark.sql(\"DROP TABLE IF EXISTS people\")\n", "spark.sql(\"\"\"\n", "CREATE TABLE people USING PARQUET\n", "AS SELECT * FROM VALUES (1, 'Alice', 10), (2, 'Bob', 20), (3, 'Charlie', 30) t(id, name, age)\n", "\"\"\")" ] }, { "cell_type": "code", "execution_count": 11, "id": "192620d0-10cb-4b25-8d5b-7a59b1ac6c56", "metadata": { "editable": true, "slideshow": { "slide_type": "" }, "tags": [] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+---+\n", "| name|age|\n", "+-------+---+\n", "|Charlie| 30|\n", "+-------+---+\n", "\n" ] } ], "source": [ "# Use spark.sql() to select data from a table\n", "spark.sql(\"SELECT name, age FROM people WHERE age > 21\").show()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "d13f853c-fb81-43f8-b496-57ff80886a3a", "metadata": {}, "source": [ "### Using the PySpark DataFrame API\n", "The PySpark DataFrame API provides equivalent functionality to SQL but with a Pythonic approach." ] }, { "cell_type": "code", "execution_count": 12, "id": "b905d379-af51-4376-9837-fa32a913951c", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+---+\n", "| name|age|\n", "+-------+---+\n", "|Charlie| 30|\n", "+-------+---+\n", "\n" ] } ], "source": [ "# Read a table using the DataFrame API\n", "people_df = spark.read.table(\"people\")\n", "\n", "# Use DataFrame API to select data\n", "people_df.select(\"name\", \"age\").filter(\"age > 21\").show()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "1df50cb1-3178-4f7d-99d7-6137c9ce9c12", "metadata": {}, "source": [ "## SQL vs. DataFrame API in PySpark\n", "When to use which API depends on your background and the specific task:\n", "\n", "**SQL API:**\n", " - Ideal for users with SQL backgrounds who are more comfortable writing SQL queries.\n", "\n", "**DataFrame API:**\n", " - Preferred by Python developers as it aligns with Python syntax and idioms.\n", " - Provides greater flexibility for complex transformations, especially with user-defined functions (UDFs)." ] }, { "attachments": {}, "cell_type": "markdown", "id": "aded4623-8909-4b82-ae1a-d8bd13245f9e", "metadata": {}, "source": [ "### Code Examples: SQL vs. DataFrame API\n", "\n", "Here are some examples comparing how common tasks are performed using the SQL API and PySpark's DataFrame API to give you an idea of their differences and when one might be more suitable than the other." ] }, { "attachments": {}, "cell_type": "markdown", "id": "b9f201db-c440-42c3-81bc-b395a01a5e13", "metadata": {}, "source": [ "#### Example: SELECT and FILTER Operation" ] }, { "attachments": {}, "cell_type": "markdown", "id": "c77022e8-a979-43ce-9b62-ad6adeff6ee4", "metadata": {}, "source": [ "**SQL API:**" ] }, { "cell_type": "code", "execution_count": 15, "id": "afccb0b5-f533-45c4-a8e0-c4fced910dd3", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+\n", "| name|\n", "+-------+\n", "|Charlie|\n", "+-------+\n", "\n" ] } ], "source": [ "spark.sql(\"SELECT name FROM people WHERE age > 21\").show()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "94f2b460-155a-4aa8-a98e-d0ef01693e8e", "metadata": {}, "source": [ "**DataFrame API:**" ] }, { "cell_type": "code", "execution_count": 16, "id": "ddd28264-b8ba-4f95-a62c-e3855c27c759", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+\n", "| name|\n", "+-------+\n", "|Charlie|\n", "+-------+\n", "\n" ] } ], "source": [ "spark.read.table(\"people\").select(\"name\").filter(\"age > 21\").show()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "86ffd501-a658-4819-abc1-721b1e2633a7", "metadata": { "editable": true, "slideshow": { "slide_type": "" }, "tags": [] }, "source": [ "#### Example: JOIN Operation" ] }, { "cell_type": "code", "execution_count": 18, "id": "63fd3534-9d20-4b96-8ef7-17adc9fc8e4f", "metadata": { "editable": true, "slideshow": { "slide_type": "" }, "tags": [] }, "outputs": [ { "data": { "text/plain": [ "DataFrame[]" ] }, "execution_count": 18, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spark.sql(\"DROP TABLE IF EXISTS orders\")\n", "spark.sql(\"\"\"\n", "CREATE TABLE orders USING PARQUET \n", "AS SELECT * FROM VALUES (101, 1, 200), (102, 2, 150), (103,3, 300) t(order_id, customer_id, amount)\n", "\"\"\")" ] }, { "attachments": {}, "cell_type": "markdown", "id": "75e88196-ef2b-42ed-aa83-4c384ea1f5ef", "metadata": {}, "source": [ "**SQL API:**" ] }, { "cell_type": "code", "execution_count": 19, "id": "56cbb734-5f33-496d-8053-079abf19bff8", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+--------+\n", "| name|order_id|\n", "+-------+--------+\n", "|Charlie| 103|\n", "| Alice| 101|\n", "| Bob| 102|\n", "+-------+--------+\n", "\n" ] } ], "source": [ "spark.sql(\"\"\"\n", "SELECT p.name, o.order_id\n", "FROM people p\n", "JOIN orders o ON p.id = o.customer_id\n", "\"\"\").show()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "42d501f1-2906-47e1-b6e6-a2c4798ef7ed", "metadata": {}, "source": [ "**DataFrame API:**" ] }, { "cell_type": "code", "execution_count": 20, "id": "ee9b69ae-f4a6-48f2-ac63-869e22a7089e", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+--------+\n", "| name|order_id|\n", "+-------+--------+\n", "|Charlie| 103|\n", "| Alice| 101|\n", "| Bob| 102|\n", "+-------+--------+\n", "\n" ] } ], "source": [ "people_df = spark.read.table(\"people\")\n", "orders_df = spark.read.table(\"orders\")\n", "(\n", " people_df\n", " .join(orders_df, people_df.id == orders_df.customer_id)\n", " .select(people_df.name, orders_df.order_id)\n", " .show()\n", ")" ] }, { "attachments": {}, "cell_type": "markdown", "id": "723cbbcd-a8f2-432a-9138-e4080fe5c391", "metadata": {}, "source": [ "#### Example: GROUP BY and Aggregate Operation" ] }, { "attachments": {}, "cell_type": "markdown", "id": "4634009a-42cc-4452-bda0-56b8300925ac", "metadata": {}, "source": [ "**SQL API:**" ] }, { "cell_type": "code", "execution_count": 21, "id": "6540c85c-e0e9-4ea8-bad5-e7a39b17ef46", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+------------+\n", "| name|total_amount|\n", "+-------+------------+\n", "|Charlie| 300|\n", "| Alice| 200|\n", "| Bob| 150|\n", "+-------+------------+\n", "\n" ] } ], "source": [ "spark.sql(\"\"\"\n", "SELECT p.name, SUM(o.amount) AS total_amount\n", "FROM people p\n", "JOIN orders o ON p.id = o.customer_id\n", "GROUP BY p.name\n", "\"\"\").show()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "8c7f2d9b-0c96-4849-a53e-82573d540f7f", "metadata": {}, "source": [ "**DataFrame API:**" ] }, { "cell_type": "code", "execution_count": 22, "id": "096446a7-b2d5-4e6d-a145-95c26f043aaa", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+------------+\n", "| name|total_amount|\n", "+-------+------------+\n", "|Charlie| 300|\n", "| Alice| 200|\n", "| Bob| 150|\n", "+-------+------------+\n", "\n" ] } ], "source": [ "from pyspark.sql.functions import sum\n", "\n", "(\n", " people_df\n", " .join(orders_df, people_df.id == orders_df.customer_id)\n", " .groupBy(\"name\")\n", " .agg(sum(\"amount\").alias(\"total_amount\"))\n", " .show()\n", ")" ] }, { "attachments": {}, "cell_type": "markdown", "id": "f4c6aad1-11fd-436a-8cb5-b1b6cde2e41e", "metadata": {}, "source": [ "#### Example: Window Operations" ] }, { "attachments": {}, "cell_type": "markdown", "id": "486e5443-e58e-41ea-8ec3-562817ede628", "metadata": {}, "source": [ "**SQL API:**" ] }, { "cell_type": "code", "execution_count": 23, "id": "a2745d1b-635b-4176-a142-84695b1dde32", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+------+----+\n", "| name|amount|rank|\n", "+-------+------+----+\n", "| Alice| 200| 1|\n", "| Bob| 150| 1|\n", "|Charlie| 300| 1|\n", "+-------+------+----+\n", "\n" ] } ], "source": [ "spark.sql(\"\"\"\n", "SELECT\n", " p.name,\n", " o.amount, \n", " RANK() OVER (PARTITION BY p.name ORDER BY o.amount DESC) AS rank\n", "FROM people p\n", "JOIN orders o ON p.id = o.customer_id\n", "\"\"\").show()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "7a0e8085-3c45-43df-946b-9db6aa78dc53", "metadata": {}, "source": [ "**DataFrame API:**" ] }, { "cell_type": "code", "execution_count": 24, "id": "05d94aa3-4338-444e-a98a-b9ba388a2070", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+------+----+\n", "| name|amount|rank|\n", "+-------+------+----+\n", "| Alice| 200| 1|\n", "| Bob| 150| 1|\n", "|Charlie| 300| 1|\n", "+-------+------+----+\n", "\n" ] } ], "source": [ "from pyspark.sql.window import Window\n", "from pyspark.sql.functions import rank\n", "\n", "# Define the window specification\n", "window_spec = Window.partitionBy(\"name\").orderBy(orders_df.amount.desc())\n", "\n", "# Window operation with RANK\n", "(\n", " people_df\n", " .join(orders_df, people_df.id == orders_df.customer_id)\n", " .withColumn(\"rank\", rank().over(window_spec))\n", " .select(\"name\", \"amount\", \"rank\")\n", " .show()\n", ")" ] }, { "attachments": {}, "cell_type": "markdown", "id": "054b56db-6f35-4820-bbfe-7d9095ea8f7c", "metadata": {}, "source": [ "#### Example: UNION Operation" ] }, { "attachments": {}, "cell_type": "markdown", "id": "1c13feb7-98f8-4772-a4a9-31f2289370e3", "metadata": {}, "source": [ "**SQL API:**\n", "- The `UNION` operator combines rows from two queries and removes duplicates by default." ] }, { "cell_type": "code", "execution_count": 25, "id": "227897ff-fe7c-4999-b8a8-a1acb193241e", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "DataFrame[]" ] }, "execution_count": 25, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spark.sql(\"CREATE OR REPLACE TEMP VIEW people2 AS SELECT * FROM VALUES (1, 'Alice', 10), (4, 'David', 35) t(id, name, age)\")" ] }, { "cell_type": "code", "execution_count": 26, "id": "8e72401b-021d-4169-97f3-36007c537e85", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+-------+---+\n", "| id| name|age|\n", "+---+-------+---+\n", "| 3|Charlie| 30|\n", "| 1| Alice| 10|\n", "| 2| Bob| 20|\n", "| 4| David| 35|\n", "+---+-------+---+\n", "\n" ] } ], "source": [ "spark.sql(\"\"\"\n", "SELECT * FROM people\n", "UNION\n", "SELECT * FROM people2\n", "\"\"\").show()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "705262ad-8963-4e14-b2c2-bffd909ec06a", "metadata": {}, "source": [ "**DataFrame API:**\n", "- The `union()` method is used to combine two DataFrames, but it does not remove duplicates by default.\n", "- To match the behavior of SQL's UNION, we use the .dropDuplicates() method to eliminate duplicates after the union operation." ] }, { "cell_type": "code", "execution_count": 27, "id": "3a67f3ee-1179-4d6c-870a-c7982753f707", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+-------+---+\n", "| id| name|age|\n", "+---+-------+---+\n", "| 3|Charlie| 30|\n", "| 1| Alice| 10|\n", "| 2| Bob| 20|\n", "| 1| Alice| 10|\n", "| 4| David| 35|\n", "+---+-------+---+\n", "\n" ] } ], "source": [ "people_df = spark.read.table(\"people\")\n", "people2_df = spark.read.table(\"people2\")\n", "# This will have duplicate values.\n", "people_df.union(people2_df).show()" ] }, { "cell_type": "code", "execution_count": 28, "id": "96e621eb-ca83-4935-85d0-757e7d49fdd0", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+-------+---+\n", "| id| name|age|\n", "+---+-------+---+\n", "| 3|Charlie| 30|\n", "| 1| Alice| 10|\n", "| 2| Bob| 20|\n", "| 4| David| 35|\n", "+---+-------+---+\n", "\n" ] } ], "source": [ "# Remove duplicate values\n", "people_df.union(people2_df).dropDuplicates().show()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "fbd87a8c-f5b5-41da-bc3e-db78fbfe1785", "metadata": {}, "source": [ "#### Example: SET Configurations" ] }, { "attachments": {}, "cell_type": "markdown", "id": "3e7e46ac-b5b3-4823-8ecb-957c5244482b", "metadata": {}, "source": [ "**SQL API:**" ] }, { "cell_type": "code", "execution_count": 29, "id": "59cb6621-f5be-48e1-9e57-817f7db6be72", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "DataFrame[key: string, value: string]" ] }, "execution_count": 29, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spark.sql(\"SET spark.sql.shuffle.partitions=8\")" ] }, { "cell_type": "code", "execution_count": 30, "id": "567f7905-5296-4651-b062-aa4c70f734ab", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------------------------+-----+\n", "|key |value|\n", "+----------------------------+-----+\n", "|spark.sql.shuffle.partitions|8 |\n", "+----------------------------+-----+\n", "\n" ] } ], "source": [ "spark.sql(\"SET spark.sql.shuffle.partitions\").show(truncate=False)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "142ee012-4d76-4969-bd48-1744b58042f5", "metadata": {}, "source": [ "**DataFrame API:**" ] }, { "cell_type": "code", "execution_count": 31, "id": "af1e497b-af1e-427a-86c6-8ce1381fc4de", "metadata": {}, "outputs": [], "source": [ "spark.conf.set(\"spark.sql.shuffle.partitions\", 10)" ] }, { "cell_type": "code", "execution_count": 32, "id": "bf30b990-ff9d-4aaa-a77f-20b4a868ef25", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "'10'" ] }, "execution_count": 32, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spark.conf.get(\"spark.sql.shuffle.partitions\")" ] }, { "attachments": {}, "cell_type": "markdown", "id": "6f13f062-1701-4c76-b546-ebcee1704ff5", "metadata": {}, "source": [ "#### Example: Listing Tables and Views" ] }, { "attachments": {}, "cell_type": "markdown", "id": "6a538410-4106-4fc1-a2db-5bf646edfe3e", "metadata": {}, "source": [ "**SQL API:**" ] }, { "cell_type": "code", "execution_count": 33, "id": "e25f34ef-5e2e-4379-b600-24fff2f11613", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---------+---------+-----------+\n", "|namespace|tableName|isTemporary|\n", "+---------+---------+-----------+\n", "| default| orders| false|\n", "| default| people| false|\n", "| | people2| true|\n", "+---------+---------+-----------+\n", "\n" ] } ], "source": [ "spark.sql(\"SHOW TABLES\").show()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "2e3e1465-383e-4b00-b40c-794e21ed6228", "metadata": {}, "source": [ "**DataFrame API:**" ] }, { "cell_type": "code", "execution_count": 34, "id": "81ba9737-3d29-4ae7-8672-403b2e553b94", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Name: orders, isTemporary: False\n", "Name: people, isTemporary: False\n", "Name: people2, isTemporary: True\n" ] } ], "source": [ "tables = spark.catalog.listTables()\n", "for table in tables:\n", " print(f\"Name: {table.name}, isTemporary: {table.isTemporary}\")" ] }, { "attachments": {}, "cell_type": "markdown", "id": "219912da-50fe-435e-b880-b4d78df4c066", "metadata": {}, "source": [ "### DataFrame API Exclusive Functions\n", "Certain operations are exclusive to the DataFrame API and are not supported in SQL, such as:" ] }, { "attachments": {}, "cell_type": "markdown", "id": "35892dd2-aae6-4789-bd98-ca0dcb914b3b", "metadata": {}, "source": [ "**withColumn**: Adds or modifies columns in a DataFrame.\n" ] }, { "cell_type": "code", "execution_count": 35, "id": "7729d4eb-81f4-430d-8d1e-f6a398f3bc55", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+-------+---+-------+\n", "| id| name|age|new_col|\n", "+---+-------+---+-------+\n", "| 3|Charlie| 30| 40|\n", "| 1| Alice| 10| 20|\n", "| 2| Bob| 20| 30|\n", "+---+-------+---+-------+\n", "\n" ] } ], "source": [ "people_df.withColumn(\"new_col\", people_df[\"age\"] + 10).show()" ] }, { "cell_type": "code", "execution_count": 39, "id": "00d09dc1-d788-43cd-b2d1-a9520ecb704b", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+-------+---+\n", "| id| name|age|\n", "+---+-------+---+\n", "| 3|Charlie| 40|\n", "| 1| Alice| 20|\n", "| 2| Bob| 30|\n", "+---+-------+---+\n", "\n" ] } ], "source": [ "people_df.withColumn(\"age\", people_df[\"age\"] + 10).show()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "04812924-2295-4b09-9a7d-ae26240a2b68", "metadata": {}, "source": [ "## Using SQL and DataFrame API Interchangeably\n", "PySpark supports switching between SQL and DataFrame API, making it easy to mix and match." ] }, { "attachments": {}, "cell_type": "markdown", "id": "7b4e77bc-47a9-4051-a7eb-c28974465733", "metadata": {}, "source": [ "### Chaining DataFrame Operations on SQL Outputs\n", "PySpark’s DataFrame API allows you to chain multiple operations together to create efficient and readable transformations. " ] }, { "cell_type": "code", "execution_count": 36, "id": "a145c7ba-6567-42ba-8089-cbf4f80e0ecd", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+---+\n", "| name|age|\n", "+-------+---+\n", "|Charlie| 30|\n", "+-------+---+\n", "\n" ] } ], "source": [ "# Chaining DataFrame operations on SQL results\n", "spark.sql(\"SELECT name, age FROM people\").filter(\"age > 21\").show()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "ff055c98-76d6-45b2-a3cc-a998b29156be", "metadata": {}, "source": [ "### Using `selectExpr()`\n", "The `selectExpr()` method allows you to run SQL expressions within the DataFrame API." ] }, { "cell_type": "code", "execution_count": 37, "id": "8784ac95-7cb2-427f-bb6f-58de919fcaf1", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+------------+\n", "| name|age_plus_one|\n", "+-------+------------+\n", "|Charlie| 31|\n", "| Alice| 11|\n", "| Bob| 21|\n", "+-------+------------+\n", "\n" ] } ], "source": [ "people_df.selectExpr(\"name\", \"age + 1 AS age_plus_one\").show()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "fc513ce0-e732-4dcc-ac99-12dc58606fa6", "metadata": {}, "source": [ "### Querying a DataFrame in SQL\n", "You can create a temporary view from a DataFrame and run SQL queries on it." ] }, { "cell_type": "code", "execution_count": 38, "id": "64b253c1-3f19-40fb-a434-f02b563d624a", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+-------+---+\n", "| id| name|age|\n", "+---+-------+---+\n", "| 3|Charlie| 30|\n", "+---+-------+---+\n", "\n" ] } ], "source": [ "# First create a temp view on top of the DataFrame.\n", "people_df.createOrReplaceTempView(\"people_view\")\n", "\n", "# Then it can be referenced in SQL.\n", "spark.sql(\"SELECT * FROM people_view WHERE age > 21\").show()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "e8e0b0be-8ef7-4228-b2af-97bc5c10b1d1", "metadata": {}, "source": [ "### Use Python User-Defined Functions in SQL\n", "You can register Python user-defined functions (UDFs) for use within SQL queries, enabling custom transformations within SQL syntax." ] }, { "cell_type": "code", "execution_count": 41, "id": "d430a9e2-be51-42dc-ad95-dca0cf1d5d4c", "metadata": { "scrolled": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+--------------------+\n", "| name|uppercase_name(name)|\n", "+-------+--------------------+\n", "|Charlie| CHARLIE|\n", "+-------+--------------------+\n", "\n" ] } ], "source": [ "from pyspark.sql.functions import udf\n", "from pyspark.sql.types import StringType\n", "\n", "# Define the UDF\n", "@udf(\"string\")\n", "def uppercase_name(name):\n", " return name.upper()\n", "\n", "# Register the UDF\n", "spark.udf.register(\"uppercase_name\", uppercase_name)\n", "\n", "# Use it in SQL\n", "spark.sql(\"SELECT name, uppercase_name(name) FROM people_view WHERE age > 21\").show()" ] }, { "cell_type": "code", "execution_count": null, "id": "72fc87df-0137-4491-b313-da5c7c172e81", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "", "language": "python", "name": "" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.10" } }, "nbformat": 4, "nbformat_minor": 5 }