{ "cells": [ { "attachments": {}, "cell_type": "markdown", "id": "0b37fb0a", "metadata": {}, "source": [ "# Chapter 1: DataFrames - A view into your structured data" ] }, { "cell_type": "code", "execution_count": 1, "id": "edfe9cd6", "metadata": { "tags": [ "remove-cell" ] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Requirement already satisfied: pyspark in /Users/amanda.liu/anaconda3/lib/python3.10/site-packages (3.5.0)\n", "Requirement already satisfied: py4j==0.10.9.7 in /Users/amanda.liu/anaconda3/lib/python3.10/site-packages (from pyspark) (0.10.9.7)\n", "Note: you may need to restart the kernel to use updated packages.\n" ] } ], "source": [ "pip install pyspark" ] }, { "cell_type": "code", "execution_count": 2, "id": "43b0e61f", "metadata": { "tags": [ "remove-cell" ] }, "outputs": [ ], "source": [ "from pyspark.sql import SparkSession\n", "\n", "spark = SparkSession \\\n", " .builder \\\n", " .appName(\"Python Spark SQL basic example\") \\\n", " .config(\"spark.some.config.option\", \"some-value\") \\\n", " .getOrCreate()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "ae367125", "metadata": {}, "source": [ "This section introduces the most fundamental data structure in PySpark: the DataFrame.\n", "\n", "A DataFrame is a two-dimensional labeled data structure with columns \n", "of potentially different types. You can think of a DataFrame like a spreadsheet, a SQL table, or a dictionary of series objects. \n", "Apache Spark DataFrames support a rich set of APIs (select columns, filter, join, aggregate, etc.) \n", "that allow you to solve common data analysis problems efficiently.\n", "\n", "Compared to traditional relational databases, Spark DataFrames offer several key advantages for big data processing and analytics:\n", "\n", "- **Distributed computing**: Spark distributes data across multiple nodes in a cluster, allowing for parallel processing of big data\n", "- **In-memory processing**: Spark performs computations in memory, which can be significantly faster than disk-based processing\n", "- **Schema flexibility**: Unlike traditional databases, PySpark DataFrames support schema evolution and dynamic typing.\n", "- **Fault tolerance**: PySpark DataFrames are built on top of Resilient Distributed Dataset (RDDs), which are inherently fault-tolerant. \n", "Spark automatically handles node failures and data replication, ensuring data reliability and integrity.\n", "\n", "A note on RDDs: \n", "Direct use of RDDs are no longer supported on Spark Connect as of Spark 4.0.\n", "Interacting directly with Spark DataFrames uses a unified planning and optimization engine, \n", "allowing us to get nearly identical performance across all supported languages on Databricks (Python, SQL, Scala, and R)." ] }, { "attachments": {}, "cell_type": "markdown", "id": "443ebbb1", "metadata": {}, "source": [ "## Create a DataFrame\n", "\n", "There are several ways to create a DataFrame in PySpark." ] }, { "attachments": {}, "cell_type": "markdown", "id": "33ebd507", "metadata": {}, "source": [ "### From a list of dictionaries\n", "\n", "The simplest way is to use the createDataFrame() method like so:" ] }, { "cell_type": "code", "execution_count": 3, "id": "b26403e5", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ " \r" ] }, { "name": "stdout", "output_type": "stream", "text": [ "+---+--------+\n", "|age| name|\n", "+---+--------+\n", "| 30| John D.|\n", "| 25|Alice G.|\n", "| 35| Bob T.|\n", "| 28| Eve A.|\n", "+---+--------+\n", "\n" ] } ], "source": [ "employees = [{\"name\": \"John D.\", \"age\": 30},\n", " {\"name\": \"Alice G.\", \"age\": 25},\n", " {\"name\": \"Bob T.\", \"age\": 35},\n", " {\"name\": \"Eve A.\", \"age\": 28}]\n", "\n", "# Create a DataFrame containing the employees data\n", "df = spark.createDataFrame(employees)\n", "df.show()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "fe7cd086", "metadata": {}, "source": [ "### From a local file\n", "\n", "We can also create a DataFrame from a local CSV file:" ] }, { "cell_type": "code", "execution_count": 4, "id": "b421b87d", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----------+-----------------+-----------------+\n", "|Employee ID| Role| Location|\n", "+-----------+-----------------+-----------------+\n", "| 19238| Data Analyst| Seattle, WA|\n", "| 19239|Software Engineer| Seattle, WA|\n", "| 19240| IT Specialist| Seattle, WA|\n", "| 19241| Data Analyst| New York, NY|\n", "| 19242| Recruiter|San Francisco, CA|\n", "| 19243| Product Manager| New York, NY|\n", "+-----------+-----------------+-----------------+\n", "\n" ] } ], "source": [ "df = spark.read.csv(\"../data/employees.csv\", header=True, inferSchema=True)\n", "df.show()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "9e8a5246", "metadata": {}, "source": [ "Or from a local JSON file:" ] }, { "cell_type": "code", "execution_count": 5, "id": "4a2d7fe9", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----------+-----------------+-----------------+\n", "|Employee ID| Location| Role|\n", "+-----------+-----------------+-----------------+\n", "| 19238| Seattle, WA| Data Analyst|\n", "| 19239| Seattle, WA|Software Engineer|\n", "| 19240| Seattle, WA| IT Specialist|\n", "| 19241| New York, NY| Data Analyst|\n", "| 19242|San Francisco, CA| Recruiter|\n", "| 19243| New York, NY| Product Manager|\n", "+-----------+-----------------+-----------------+\n", "\n" ] } ], "source": [ "df = spark.read.option(\"multiline\",\"true\").json(\"../data/employees.json\")\n", "df.show()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "da022789", "metadata": {}, "source": [ "### From an existing DataFrame\n", "\n", "We can even create a DataFrame from another existing DataFrame, by selecting certain columns:" ] }, { "cell_type": "code", "execution_count": 6, "id": "d494632d", "metadata": {}, "outputs": [], "source": [ "employees = [\n", " {\"name\": \"John D.\", \"age\": 30, \"department\": \"HR\"},\n", " {\"name\": \"Alice G.\", \"age\": 25, \"department\": \"Finance\"},\n", " {\"name\": \"Bob T.\", \"age\": 35, \"department\": \"IT\"},\n", " {\"name\": \"Eve A.\", \"age\": 28, \"department\": \"Marketing\"}\n", "]\n", "df = spark.createDataFrame(employees)\n", "\n", "# Select only the name and age columns\n", "new_df = df.select(\"name\", \"age\")" ] }, { "attachments": {}, "cell_type": "markdown", "id": "a330bfa9", "metadata": {}, "source": [ "### From a table\n", "\n", "If you have an existing table `table_name` in your Spark environment, you can create a DataFrame like this:" ] }, { "cell_type": "code", "execution_count": 7, "id": "14ad6034", "metadata": { "tags": [ "remove-output" ] }, "outputs": [ ], "source": [ "df = spark.read.table(\"table_name\")" ] }, { "attachments": {}, "cell_type": "markdown", "id": "892871a6", "metadata": {}, "source": [ "### From a database\n", "\n", "If your table is in a database, you can use JDBC to read the table into a DataFrame.\n" ] }, { "cell_type": "code", "execution_count": 9, "id": "a40a5d54", "metadata": { "tags": [ "remove-output" ] }, "outputs": [ ], "source": [ "url = \"jdbc:mysql://localhost:3306/mydatabase\"\n", "table = \"employees\"\n", "properties = {\n", " \"user\": \"username\",\n", " \"password\": \"password\"\n", "}\n", "\n", "# Read table into DataFrame\n", "df = spark.read.jdbc(url=url, table=table, properties=properties)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "91c58617", "metadata": {}, "source": [ "## View the DataFrame\n", "\n", "We can use PySpark to view and interact with our DataFrame." ] }, { "attachments": {}, "cell_type": "markdown", "id": "25faacd7", "metadata": {}, "source": [ "### Display the DataFrame\n", "\n", "`df.show()` displays a basic visualization of the DataFrame's contents. From our above `createDataFrame()` example:" ] }, { "cell_type": "code", "execution_count": 10, "id": "6a91ef12", "metadata": {}, "outputs": [], "source": [ "employees = [{\"name\": \"John D.\", \"age\": 30},\n", " {\"name\": \"Alice G.\", \"age\": 25},\n", " {\"name\": \"Bob T.\", \"age\": 35},\n", " {\"name\": \"Eve A.\", \"age\": 28}]\n", "\n", "# Create a DataFrame containing the employees data\n", "df = spark.createDataFrame(employees)" ] }, { "cell_type": "code", "execution_count": 11, "id": "c2ce1c82", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+--------+\n", "|age| name|\n", "+---+--------+\n", "| 30| John D.|\n", "| 25|Alice G.|\n", "| 35| Bob T.|\n", "| 28| Eve A.|\n", "+---+--------+\n", "\n" ] } ], "source": [ "df.show()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "c6ee7c70", "metadata": {}, "source": [ "`df.show()` has 3 optional arguments: `n`, `truncate`, and `vertical`.\n", "\n", "By default, `df.show()` displays up to the first 20 rows of the DataFrame. \n", "We can control the number of rows displayed by passing an argument to the show() method:" ] }, { "cell_type": "code", "execution_count": 12, "id": "01417e41", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+--------+\n", "|age| name|\n", "+---+--------+\n", "| 30| John D.|\n", "| 25|Alice G.|\n", "+---+--------+\n", "only showing top 2 rows\n", "\n" ] } ], "source": [ "df.show(n=2)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "605cd26c", "metadata": {}, "source": [ "The truncate argument controls the length of displayed column values (default value is 20):" ] }, { "cell_type": "code", "execution_count": 13, "id": "b01d5223", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+----+\n", "|age|name|\n", "+---+----+\n", "| 30| Joh|\n", "| 25| Ali|\n", "| 35| Bob|\n", "| 28| Eve|\n", "+---+----+\n", "\n" ] } ], "source": [ "df.show(truncate=3)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "e9bedaa6", "metadata": {}, "source": [ "If we set `vertical` to True, the DataFrame will be displayed vertically with one line per value:" ] }, { "cell_type": "code", "execution_count": 14, "id": "267facfc", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "-RECORD 0--------\n", " age | 30 \n", " name | John D. \n", "-RECORD 1--------\n", " age | 25 \n", " name | Alice G. \n", "-RECORD 2--------\n", " age | 35 \n", " name | Bob T. \n", "-RECORD 3--------\n", " age | 28 \n", " name | Eve A. \n", "\n" ] } ], "source": [ "df.show(vertical=True)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "4de10f68", "metadata": {}, "source": [ "### Print the DataFrame schema\n", "\n", "We can view information about the DataFrame schema using the `printSchema()` method:" ] }, { "cell_type": "code", "execution_count": 15, "id": "27481fa9", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- age: long (nullable = true)\n", " |-- name: string (nullable = true)\n", "\n" ] } ], "source": [ "df.printSchema()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "8e90d081", "metadata": {}, "source": [ "## DataFrame Manipulation\n", "\n", "Let's look at some ways we can transform our DataFrames.\n", "\n", "For more detailed information, please see the section about data manipulation, [Chapter 6: Function Junction - Data manipulation with PySpark](https://databricks-eng.github.io/pyspark-cookbook/07-dataprep.html).\n", "\n", "### Rename columns\n", "\n", "We can rename DataFrame columns using the `withColumnRenamed()` method:" ] }, { "cell_type": "code", "execution_count": 16, "id": "65d6dfcb", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+--------+\n", "|age| name|\n", "+---+--------+\n", "| 30| John D.|\n", "| 25|Alice G.|\n", "| 35| Bob T.|\n", "| 28| Eve A.|\n", "+---+--------+\n", "\n", "+---+---------+\n", "|age|full_name|\n", "+---+---------+\n", "| 30| John D.|\n", "| 25| Alice G.|\n", "| 35| Bob T.|\n", "| 28| Eve A.|\n", "+---+---------+\n", "\n" ] } ], "source": [ "df.show()\n", "df2 = df.withColumnRenamed(\"name\", \"full_name\")\n", "df2.show()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "10f3c03f", "metadata": {}, "source": [ "### Filter rows\n", "\n", "We can filter for employees within a certain age range.\n", "The following `df.filter` will create a new DataFrame with rows that match our age condition:" ] }, { "cell_type": "code", "execution_count": 17, "id": "af133309", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+-------+\n", "|age| name|\n", "+---+-------+\n", "| 30|John D.|\n", "| 28| Eve A.|\n", "+---+-------+\n", "\n" ] } ], "source": [ "filtered_df = df.filter((df[\"age\"] > 26) & (df[\"age\"] < 32))\n", "filtered_df.show()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "c49ea696", "metadata": {}, "source": [ "We can also use `df.where` to get the same result:" ] }, { "cell_type": "code", "execution_count": 18, "id": "a29a0719", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+-------+\n", "|age| name|\n", "+---+-------+\n", "| 30|John D.|\n", "| 28| Eve A.|\n", "+---+-------+\n", "\n" ] } ], "source": [ "where_df = df.where((df[\"age\"] > 26) & (df[\"age\"] < 32))\n", "where_df.show()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "b6d1026a", "metadata": {}, "source": [ "## DataFrames vs. Tables\n", "A DataFrame is an immutable distributed collection of data, only available in the current Spark session.\n", "\n", "A table is a persistent data structure that can be accessed across multiple Spark sessions.\n", "\n", "If you wish to promote a DataFrame to a table, you can use the `createOrReplaceTempView()` method:" ] }, { "cell_type": "code", "execution_count": 19, "id": "778ad9c5", "metadata": {}, "outputs": [], "source": [ "df.createOrReplaceTempView(\"employees\")" ] }, { "attachments": {}, "cell_type": "markdown", "id": "0a448326", "metadata": {}, "source": [ "Note that the lifetime of this temporary table is tied to the SparkSession that was used to create this DataFrame.\n", "To persist the table beyond this Spark session, you will need to save it to persistent storage." ] }, { "attachments": {}, "cell_type": "markdown", "id": "bd253f06", "metadata": {}, "source": [ "## Save DataFrame to Persistent Storage\n", "\n", "There are several ways to save a DataFrame to persistent storage in PySpark.\n", "For more detailed information about saving data to your local environment,\n", "please see the section about Data Loading (TODO: add link)." ] }, { "attachments": {}, "cell_type": "markdown", "id": "b72663f5", "metadata": {}, "source": [ "### Save to file-based data source\n", "\n", "For file-based data source (text, parquet, json, etc.), you can specify a custom table path like so: " ] }, { "cell_type": "code", "execution_count": 20, "id": "714d52bd", "metadata": { "tags": [ "remove-output" ] }, "outputs": [], "source": [ "df.write.option(\"path\", \"../dataout\").saveAsTable(\"dataframes_savetable_example\")" ] }, { "attachments": {}, "cell_type": "markdown", "id": "fcc3fc81", "metadata": {}, "source": [ "Even if the table is dropped, the custom table path and table data will still be there. \n", "\n", "If no custom table path is specified, Spark will write data to a default table path under the warehouse directory. \n", "When the table is dropped, the default table path will be removed too." ] }, { "attachments": {}, "cell_type": "markdown", "id": "c98e0afb", "metadata": {}, "source": [ "### Save to Hive metastore\n", "To save to Hive metastore, you can use the following:" ] }, { "cell_type": "code", "execution_count": 21, "id": "00c35126", "metadata": { "tags": [ "remove-output" ] }, "outputs": [ ], "source": [ "df.write().mode(\"overwrite\").saveAsTable(\"schemaName.tableName\")" ] } ], "metadata": { "celltoolbar": "Tags", "kernelspec": { "display_name": "", "language": "python", "name": "" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.4" } }, "nbformat": 4, "nbformat_minor": 5 }