Data Extraction

Data Extraction

This notebook works best an EMR 6.1.0 (Haddop 3.2.1, Spark 3.0.0) cluster with 1 master-node (m5.xlarge) and 3 core-nodes (m5.xlarge).

The pushshift dataset is available in the s3a://bigdatateaching under the /reddit/parquet/ prefix. This notebook provides code to read this data so that you can extract the subreddits that meet your requirements and ready to work with the filtered data!

In this notebook we do the following:

  1. Set up a SparkSession.

  2. Read the submissions and comments data into Pyspark dataframes and do basic exploratory analysis.

  3. Filter the subreddits of interest into new PySpark dataframes, save them into an S3 bucket in your account.

After all these steps, you are ready to work with reddit data for your project!

Set up a SparkSession

# Setup - Run only once per Kernel App
%conda install openjdk -y

# install PySpark
%pip install pyspark==3.2.0

# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")
Collecting package metadata (current_repodata.json): done
Solving environment: done


==> WARNING: A newer version of conda exists. <==
  current version: 23.3.1
  latest version: 24.3.0

Please update conda by running

    $ conda update -n base -c defaults conda

Or to minimize the number of packages updated during conda update use

     conda install conda=24.3.0



## Package Plan ##

  environment location: /opt/conda

  added / updated specs:
    - openjdk


The following packages will be downloaded:

    package                    |            build
    ---------------------------|-----------------
    ca-certificates-2024.3.11  |       h06a4308_0         127 KB
    certifi-2024.2.2           |  py310h06a4308_0         159 KB
    openjdk-11.0.13            |       h87a67e3_0       341.0 MB
    ------------------------------------------------------------
                                           Total:       341.3 MB

The following NEW packages will be INSTALLED:

  openjdk            pkgs/main/linux-64::openjdk-11.0.13-h87a67e3_0 

The following packages will be UPDATED:

  ca-certificates    conda-forge::ca-certificates-2023.11.~ --> pkgs/main::ca-certificates-2024.3.11-h06a4308_0 
  certifi            conda-forge/noarch::certifi-2023.11.1~ --> pkgs/main/linux-64::certifi-2024.2.2-py310h06a4308_0 



Downloading and Extracting Packages
openjdk-11.0.13      | 341.0 MB  |                                       |   0% 
ca-certificates-2024 | 127 KB    |                                       |   0% 

certifi-2024.2.2     | 159 KB    |                                       |   0% 
ca-certificates-2024 | 127 KB    | ##################################### | 100% 

                                                                                
                                                                                

                                                                                
Preparing transaction: done
Verifying transaction: done
Executing transaction: done

Note: you may need to restart the kernel to use updated packages.
Collecting pyspark==3.2.0
  Using cached pyspark-3.2.0-py2.py3-none-any.whl
Collecting py4j==0.10.9.2 (from pyspark==3.2.0)
  Using cached py4j-0.10.9.2-py2.py3-none-any.whl.metadata (1.3 kB)
Using cached py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv

[notice] A new release of pip is available: 23.3.1 -> 24.0
[notice] To update, run: pip install --upgrade pip
Note: you may need to restart the kernel to use updated packages.
# Import pyspark and build Spark session
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.appName("PySparkApp")
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2")
    .config(
        "fs.s3a.aws.credentials.provider",
        "com.amazonaws.auth.ContainerCredentialsProvider",
    )
    .getOrCreate()
)

print(spark.version)
Warning: Ignoring non-Spark config property: fs.s3a.aws.credentials.provider
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/opt/conda/lib/python3.10/site-packages/pyspark/jars/spark-unsafe_2.12-3.2.0.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-34985a56-686b-443e-ba38-60fa5b96605a;1.0
    confs: [default]
    found org.apache.hadoop#hadoop-aws;3.2.2 in central
    found com.amazonaws#aws-java-sdk-bundle;1.11.563 in central
:: resolution report :: resolve 341ms :: artifacts dl 22ms
    :: modules in use:
    com.amazonaws#aws-java-sdk-bundle;1.11.563 from central in [default]
    org.apache.hadoop#hadoop-aws;3.2.2 from central in [default]
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
    ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-34985a56-686b-443e-ba38-60fa5b96605a
    confs: [default]
    0 artifacts copied, 2 already retrieved (0kB/18ms)
24/04/01 01:09:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
:: loading settings :: url = jar:file:/opt/conda/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
3.2.0

Save the submissions and comments data in the local path

!mkdir -p ./code
%%writefile ./code/process.py

import os
import logging
import argparse

# Import pyspark and build Spark session
from pyspark.sql.functions import *
from pyspark.sql.types import (
    DoubleType,
    IntegerType,
    StringType,
    StructField,
    StructType,
)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

logging.basicConfig(format='%(asctime)s,%(levelname)s,%(module)s,%(filename)s,%(lineno)d,%(message)s', level=logging.DEBUG)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))

def main():
    parser = argparse.ArgumentParser(description="app inputs and outputs")
    parser.add_argument("--s3_dataset_path", type=str, help="Path of dataset in S3")    
    parser.add_argument("--s3_output_bucket", type=str, help="s3 output bucket")
    parser.add_argument("--s3_output_prefix", type=str, help="s3 output prefix")
    parser.add_argument("--col_name_for_filtering", type=str, help="Name of the column to filter")
    parser.add_argument("--values_to_keep", type=str, help="comma separated list of values to keep in the filtered set")
    args = parser.parse_args()

    spark = SparkSession.builder.appName("PySparkApp").getOrCreate()
    logger.info(f"spark version = {spark.version}")
    
    # This is needed to save RDDs which is the only way to write nested Dataframes into CSV format
    sc = spark.sparkContext
    sc._jsc.hadoopConfiguration().set(
        "mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter"
    )

   
    # Downloading the data from S3 into a Dataframe
    logger.info(f"going to read {args.s3_dataset_path}")
    df = spark.read.parquet(args.s3_dataset_path, header=True)
    logger.info(f"finished reading files...")
    

    
    # filter the dataframe to only keep the values of interest
    vals = [s.strip() for s in args.values_to_keep.split(",")]
    df_filtered = df.where(col(args.col_name_for_filtering).isin(vals))
    
    # save the filtered dataframes so that these files can now be used for future analysis
    s3_path = f"s3://{args.s3_output_bucket}/{args.s3_output_prefix}"
    logger.info(f"going to write data for {vals} in {s3_path}")
    logger.info(f"shape of the df_filtered dataframe is {df_filtered.count():,}x{len(df_filtered.columns)}")
    df_filtered.write.mode("overwrite").parquet(s3_path)
    
    logger.info(f"all done...")
    
if __name__ == "__main__":
    main()
Overwriting ./code/process.py

Now submit this code to SageMaker Processing Job.

%%time
import time
import sagemaker
from sagemaker.spark.processing import PySparkProcessor

# Setup the PySpark processor to run the job. Note the instance type and instance count parameters. SageMaker will create these many instances of this type for the spark job.
role = sagemaker.get_execution_role()
spark_processor = PySparkProcessor(
    base_job_name="sm-spark-project",
    framework_version="3.3",
    role=role,
    instance_count=8,
    instance_type="ml.m5.xlarge",
    max_runtime_in_seconds=3600,
)

# s3 paths
session = sagemaker.Session()
bucket = session.default_bucket()
output_prefix_logs = f"spark_logs"

# modify this comma separated list to choose the subreddits of interest
subreddits = "technology, Futurology, news"
configuration = [
    {
        "Classification": "spark-defaults",
        "Properties": {"spark.executor.memory": "12g", "spark.executor.cores": "4"},
    }
]

# the dataset contains data for these 3 years
year_list = [2021,2022,2023]
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
CPU times: user 3.24 s, sys: 521 ms, total: 3.76 s
Wall time: 4.26 s
%%time
for yyyy in year_list:
    print(f"going to filter comments data for year={yyyy}")
    s3_dataset_path_commments = f"s3://bigdatateaching/reddit-parquet/comments/year={yyyy}/month=*/*.parquet" # "s3a://bigdatateaching/reddit/parquet/comments/yyyy=*/mm=*/*comments*.parquet"
    output_prefix_data_comments = f"finalproject/comments/yyyy={yyyy}"
    col_name_for_filtering = "subreddit"
    subreddits = "technology, Futurology, news"

    # run the job now, the arguments array is provided as command line to the Python script (Spark code in this case).
    spark_processor.run(
        submit_app="./code/process.py",
        arguments=[
            "--s3_dataset_path",
            s3_dataset_path_commments,
            "--s3_output_bucket",
            bucket,
            "--s3_output_prefix",
            output_prefix_data_comments,
            "--col_name_for_filtering",
            col_name_for_filtering,
            "--values_to_keep",
            subreddits,
        ],
        spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, output_prefix_logs),
        logs=False,
        configuration=configuration
    )
    # give some time for resources from this iterations to get cleaned up
    # if we start the job immediately we could get insufficient resources error
    time.sleep(60)
going to filter comments data for year=2021
..........................................................................................................................................................................................................!going to filter comments data for year=2022
...................................................................................................................................................................................................................!going to filter comments data for year=2023
...................................................................................................................................................!CPU times: user 2.91 s, sys: 523 ms, total: 3.43 s
Wall time: 50min 39s
INFO:sagemaker:Creating processing-job with name sm-spark-project-2024-04-01-01-10-48-257
INFO:sagemaker:Creating processing-job with name sm-spark-project-2024-04-01-01-28-59-512
INFO:sagemaker:Creating processing-job with name sm-spark-project-2024-04-01-01-47-55-902
%%time
for yyyy in year_list:
    print(f"going to filter submissions data for year={yyyy}")
    s3_dataset_path_submissions = f"s3://bigdatateaching/reddit-parquet/submissions/year={yyyy}/month=*/*.parquet" # "s3a://bigdatateaching/reddit/parquet/submissions/yyyy=*/mm=*/*submissions*.parquet"
    output_prefix_data_submissions = f"finalproject/submissions/yyyy={yyyy}"

    # run the job now, the arguments array is provided as command line to the Python script (Spark code in this case).
    spark_processor.run(
        submit_app="./code/process.py",
        arguments=[
             "--s3_dataset_path",
            s3_dataset_path_submissions,
            "--s3_output_bucket",
            bucket,
            "--s3_output_prefix",
            output_prefix_data_submissions,
            "--col_name_for_filtering",
            col_name_for_filtering,
            "--values_to_keep",
            subreddits,
        ],
        spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, output_prefix_logs),
        logs=False,
        configuration=configuration
    )
    # give some time for resources from this iterations to get cleaned up
    # if we start the job immediately we could get insufficient resources error
    time.sleep(60)
INFO:sagemaker:Creating processing-job with name sm-spark-project-2024-04-01-02-20-34-981
INFO:sagemaker:Creating processing-job with name sm-spark-project-2024-04-01-02-40-52-671
INFO:sagemaker:Creating processing-job with name sm-spark-project-2024-04-01-03-07-31-547
going to filter submissions data for year=2021
...................................................................................................................................................................................................................................!going to filter submissions data for year=2022
..............................................................................................................................................................................................................................................................................................................!going to filter submissions data for year=2023
.............................................................................................................................!CPU times: user 3.31 s, sys: 585 ms, total: 3.9 s
Wall time: 58min 36s

Read the filtered data

Now that we have filtered the data to only keep submissions and comments from subreddits of interest. Let us read data from the s3 path where we saved the filtered data.

%%time
import sagemaker
session = sagemaker.Session()
bucket = session.default_bucket()
output_prefix_data_comments = "finalproject/comments/yyyy=*"
s3_path = f"s3a://{bucket}/{output_prefix_data_comments}"
#s3_path = "s3a://sagemaker-us-east-1-527047370587/project/comments/yyyy=2021/425151 part-00000-28396f0a-9f66-4e79-bdee-5a0fcc71cf24-c000.snappy.parquet"
print(f"reading comments from {s3_path}")
comments = spark.read.parquet(s3_path, header=True)
print(f"shape of the comments dataframe is {comments.count():,}x{len(comments.columns)}")
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
reading comments from s3a://sagemaker-us-east-1-527047370587/finalproject/comments/yyyy=*
shape of the comments dataframe is 31,880,148x21
CPU times: user 672 ms, sys: 188 ms, total: 860 ms
Wall time: 7min 13s
24/04/01 02:03:23 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
[Stage 1:======================================================>(275 + 1) / 276]                                                                                
# check counts (ensuring all needed subreddits exist)
comments.groupBy('subreddit').count().show()
[Stage 4:======================================================>(275 + 1) / 276]                                                                                
+----------+--------+
| subreddit|   count|
+----------+--------+
|      news|21525282|
|technology| 7320261|
|Futurology| 3034605|
+----------+--------+
comments.printSchema()
root
 |-- author: string (nullable = true)
 |-- author_cakeday: boolean (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_text: string (nullable = true)
 |-- body: string (nullable = true)
 |-- can_gild: boolean (nullable = true)
 |-- controversiality: long (nullable = true)
 |-- created_utc: timestamp (nullable = true)
 |-- distinguished: string (nullable = true)
 |-- edited: string (nullable = true)
 |-- gilded: long (nullable = true)
 |-- id: string (nullable = true)
 |-- is_submitter: boolean (nullable = true)
 |-- link_id: string (nullable = true)
 |-- parent_id: string (nullable = true)
 |-- permalink: string (nullable = true)
 |-- retrieved_on: timestamp (nullable = true)
 |-- score: long (nullable = true)
 |-- stickied: boolean (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)
# display a subset of columns
comments.select("subreddit", "author", "body", "parent_id", "link_id", "id", "created_utc").show()
+----------+--------------------+--------------------+----------+---------+-------+-------------------+
| subreddit|              author|                body| parent_id|  link_id|     id|        created_utc|
+----------+--------------------+--------------------+----------+---------+-------+-------------------+
|      news|            Sulaco99|           Why wait?|t1_gid6yr4|t3_krvwkf|gid7nhx|2021-01-07 00:22:43|
|      news|  jeopardy_themesong|The tweet is gone...|t1_gicxxdr|t3_krvwkf|gid7ni1|2021-01-07 00:22:43|
|      news|   West_Incident9552|You should probab...|t1_gid72ei|t3_krvwkf|gid7njz|2021-01-07 00:22:43|
|      news|      StatusReality4|You wouldn't be a...|t1_gid6aea|t3_krvwkf|gid7nm5|2021-01-07 00:22:44|
|      news|              5omkiy|I believe the ter...|t1_gid79xs|t3_krvwkf|gid7nmz|2021-01-07 00:22:44|
|      news|      AnneONymous125|Peep the video I ...|t1_gid705n|t3_krzopk|gid7nnk|2021-01-07 00:22:44|
|      news|       SpeedflyChris|He's sent a viole...|t1_gid77lu|t3_krzopk|gid7no0|2021-01-07 00:22:45|
|      news|Anothernamelesacount|As much as I woul...|t1_gid606v|t3_krzopk|gid7nqo|2021-01-07 00:22:46|
|      news|           [deleted]|Snurchy, i think ...|t1_gid2a7o|t3_kr8gb6|gid7nrf|2021-01-07 00:22:46|
|Futurology|           FuzziBear|i don’t think it’...|t1_gicw2h8|t3_krn3ft|gid7nsa|2021-01-07 00:22:46|
|      news|           JustJeezy|I’m trying not to...|t1_gid781s|t3_krzopk|gid7ntz|2021-01-07 00:22:47|
|      news|BretTheShitmanFart69|It seems like thi...|t1_gid6ap7|t3_krzopk|gid7nwx|2021-01-07 00:22:48|
|      news|         BingoBarnes|         Gravy Seals|t1_gid5pzu|t3_krzopk|gid7nzk|2021-01-07 00:22:49|
|      news|       Spocks_Goatee|Can you back up y...|t1_gid7fvq|t3_krvwkf|gid7o0r|2021-01-07 00:22:49|
|      news|          prodigymix|You must've just ...|t1_gid7i9c|t3_krvwkf|gid7o20|2021-01-07 00:22:49|
|      news|      DontDropThSoap|They will be prot...|t1_gid71o5|t3_krzopk|gid7o3o|2021-01-07 00:22:50|
|      news|           [deleted]|           [deleted]|t1_gid79xs|t3_krvwkf|gid7o5g|2021-01-07 00:22:51|
|      news|         mgd09292007|Today I found mys...| t3_krvwkf|t3_krvwkf|gid7o5p|2021-01-07 00:22:51|
|      news|           [deleted]|           [removed]| t3_krzopk|t3_krzopk|gid7o5r|2021-01-07 00:22:51|
|      news|           [deleted]|           [deleted]|t1_gid76dl|t3_krvwkf|gid7o6b|2021-01-07 00:22:51|
+----------+--------------------+--------------------+----------+---------+-------+-------------------+
only showing top 20 rows
%%time
output_prefix_data_submissions = f"finalproject/submissions/yyyy=*"
s3_path = f"s3a://{bucket}/{output_prefix_data_submissions}"
print(f"reading submissions from {s3_path}")
submissions = spark.read.parquet(s3_path, header=True)
print(f"shape of the submissions dataframe is {submissions.count():,}x{len(submissions.columns)}")
reading submissions from s3a://sagemaker-us-east-1-527047370587/finalproject/submissions/yyyy=*
shape of the submissions dataframe is 1,094,770x68
CPU times: user 132 ms, sys: 26.4 ms, total: 159 ms
Wall time: 2min 59s
24/04/01 03:22:03 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 9:=======================================================>(99 + 1) / 100]                                                                                
# check counts (ensuring all needed subreddits exist)
submissions.groupBy('subreddit').count().show()
[Stage 12:======================================================>(99 + 1) / 100]                                                                                
+----------+------+
| subreddit| count|
+----------+------+
|      news|868430|
|technology|181596|
|Futurology| 44744|
+----------+------+
submissions.printSchema()
root
 |-- adserver_click_url: string (nullable = true)
 |-- adserver_imp_pixel: string (nullable = true)
 |-- archived: boolean (nullable = true)
 |-- author: string (nullable = true)
 |-- author_cakeday: boolean (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_text: string (nullable = true)
 |-- author_id: string (nullable = true)
 |-- brand_safe: boolean (nullable = true)
 |-- contest_mode: boolean (nullable = true)
 |-- created_utc: timestamp (nullable = true)
 |-- crosspost_parent: string (nullable = true)
 |-- crosspost_parent_list: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- approved_at_utc: string (nullable = true)
 |    |    |-- approved_by: string (nullable = true)
 |    |    |-- archived: boolean (nullable = true)
 |    |    |-- author: string (nullable = true)
 |    |    |-- author_flair_css_class: string (nullable = true)
 |    |    |-- author_flair_text: string (nullable = true)
 |    |    |-- banned_at_utc: string (nullable = true)
 |    |    |-- banned_by: string (nullable = true)
 |    |    |-- brand_safe: boolean (nullable = true)
 |    |    |-- can_gild: boolean (nullable = true)
 |    |    |-- can_mod_post: boolean (nullable = true)
 |    |    |-- clicked: boolean (nullable = true)
 |    |    |-- contest_mode: boolean (nullable = true)
 |    |    |-- created: double (nullable = true)
 |    |    |-- created_utc: double (nullable = true)
 |    |    |-- distinguished: string (nullable = true)
 |    |    |-- domain: string (nullable = true)
 |    |    |-- downs: long (nullable = true)
 |    |    |-- edited: boolean (nullable = true)
 |    |    |-- gilded: long (nullable = true)
 |    |    |-- hidden: boolean (nullable = true)
 |    |    |-- hide_score: boolean (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- is_crosspostable: boolean (nullable = true)
 |    |    |-- is_reddit_media_domain: boolean (nullable = true)
 |    |    |-- is_self: boolean (nullable = true)
 |    |    |-- is_video: boolean (nullable = true)
 |    |    |-- likes: string (nullable = true)
 |    |    |-- link_flair_css_class: string (nullable = true)
 |    |    |-- link_flair_text: string (nullable = true)
 |    |    |-- locked: boolean (nullable = true)
 |    |    |-- media: string (nullable = true)
 |    |    |-- mod_reports: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- num_comments: long (nullable = true)
 |    |    |-- num_crossposts: long (nullable = true)
 |    |    |-- num_reports: string (nullable = true)
 |    |    |-- over_18: boolean (nullable = true)
 |    |    |-- parent_whitelist_status: string (nullable = true)
 |    |    |-- permalink: string (nullable = true)
 |    |    |-- pinned: boolean (nullable = true)
 |    |    |-- quarantine: boolean (nullable = true)
 |    |    |-- removal_reason: string (nullable = true)
 |    |    |-- report_reasons: string (nullable = true)
 |    |    |-- saved: boolean (nullable = true)
 |    |    |-- score: long (nullable = true)
 |    |    |-- secure_media: string (nullable = true)
 |    |    |-- selftext: string (nullable = true)
 |    |    |-- selftext_html: string (nullable = true)
 |    |    |-- spoiler: boolean (nullable = true)
 |    |    |-- stickied: boolean (nullable = true)
 |    |    |-- subreddit: string (nullable = true)
 |    |    |-- subreddit_id: string (nullable = true)
 |    |    |-- subreddit_name_prefixed: string (nullable = true)
 |    |    |-- subreddit_type: string (nullable = true)
 |    |    |-- suggested_sort: string (nullable = true)
 |    |    |-- thumbnail: string (nullable = true)
 |    |    |-- thumbnail_height: string (nullable = true)
 |    |    |-- thumbnail_width: string (nullable = true)
 |    |    |-- title: string (nullable = true)
 |    |    |-- ups: long (nullable = true)
 |    |    |-- url: string (nullable = true)
 |    |    |-- user_reports: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- view_count: string (nullable = true)
 |    |    |-- visited: boolean (nullable = true)
 |    |    |-- whitelist_status: string (nullable = true)
 |-- disable_comments: boolean (nullable = true)
 |-- distinguished: string (nullable = true)
 |-- domain: string (nullable = true)
 |-- domain_override: string (nullable = true)
 |-- edited: string (nullable = true)
 |-- embed_type: string (nullable = true)
 |-- embed_url: string (nullable = true)
 |-- gilded: long (nullable = true)
 |-- hidden: boolean (nullable = true)
 |-- hide_score: boolean (nullable = true)
 |-- href_url: string (nullable = true)
 |-- id: string (nullable = true)
 |-- imp_pixel: string (nullable = true)
 |-- is_crosspostable: boolean (nullable = true)
 |-- is_reddit_media_domain: boolean (nullable = true)
 |-- is_self: boolean (nullable = true)
 |-- is_video: boolean (nullable = true)
 |-- link_flair_css_class: string (nullable = true)
 |-- link_flair_text: string (nullable = true)
 |-- locked: boolean (nullable = true)
 |-- media: struct (nullable = true)
 |    |-- event_id: string (nullable = true)
 |    |-- oembed: struct (nullable = true)
 |    |    |-- author_name: string (nullable = true)
 |    |    |-- author_url: string (nullable = true)
 |    |    |-- cache_age: long (nullable = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- height: long (nullable = true)
 |    |    |-- html: string (nullable = true)
 |    |    |-- provider_name: string (nullable = true)
 |    |    |-- provider_url: string (nullable = true)
 |    |    |-- thumbnail_height: long (nullable = true)
 |    |    |-- thumbnail_url: string (nullable = true)
 |    |    |-- thumbnail_width: long (nullable = true)
 |    |    |-- title: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- url: string (nullable = true)
 |    |    |-- version: string (nullable = true)
 |    |    |-- width: long (nullable = true)
 |    |-- reddit_video: struct (nullable = true)
 |    |    |-- dash_url: string (nullable = true)
 |    |    |-- duration: long (nullable = true)
 |    |    |-- fallback_url: string (nullable = true)
 |    |    |-- height: long (nullable = true)
 |    |    |-- hls_url: string (nullable = true)
 |    |    |-- is_gif: boolean (nullable = true)
 |    |    |-- scrubber_media_url: string (nullable = true)
 |    |    |-- transcoding_status: string (nullable = true)
 |    |    |-- width: long (nullable = true)
 |    |-- type: string (nullable = true)
 |-- media_embed: struct (nullable = true)
 |    |-- content: string (nullable = true)
 |    |-- height: long (nullable = true)
 |    |-- scrolling: boolean (nullable = true)
 |    |-- width: long (nullable = true)
 |-- mobile_ad_url: string (nullable = true)
 |-- num_comments: long (nullable = true)
 |-- num_crossposts: long (nullable = true)
 |-- original_link: string (nullable = true)
 |-- over_18: boolean (nullable = true)
 |-- parent_whitelist_status: string (nullable = true)
 |-- permalink: string (nullable = true)
 |-- pinned: boolean (nullable = true)
 |-- post_hint: string (nullable = true)
 |-- preview: struct (nullable = true)
 |    |-- enabled: boolean (nullable = true)
 |    |-- images: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- resolutions: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |-- url: string (nullable = true)
 |    |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |-- source: struct (nullable = true)
 |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |-- url: string (nullable = true)
 |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |-- variants: struct (nullable = true)
 |    |    |    |    |-- gif: struct (nullable = true)
 |    |    |    |    |    |-- resolutions: array (nullable = true)
 |    |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |    |    |-- url: string (nullable = true)
 |    |    |    |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |    |    |-- source: struct (nullable = true)
 |    |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |    |-- url: string (nullable = true)
 |    |    |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |    |-- mp4: struct (nullable = true)
 |    |    |    |    |    |-- resolutions: array (nullable = true)
 |    |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |    |    |-- url: string (nullable = true)
 |    |    |    |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |    |    |-- source: struct (nullable = true)
 |    |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |    |-- url: string (nullable = true)
 |    |    |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |    |-- nsfw: struct (nullable = true)
 |    |    |    |    |    |-- resolutions: array (nullable = true)
 |    |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |    |    |-- url: string (nullable = true)
 |    |    |    |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |    |    |-- source: struct (nullable = true)
 |    |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |    |-- url: string (nullable = true)
 |    |    |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |    |-- obfuscated: struct (nullable = true)
 |    |    |    |    |    |-- resolutions: array (nullable = true)
 |    |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |    |    |-- url: string (nullable = true)
 |    |    |    |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |    |    |-- source: struct (nullable = true)
 |    |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |    |-- url: string (nullable = true)
 |    |    |    |    |    |    |-- width: long (nullable = true)
 |-- promoted: boolean (nullable = true)
 |-- promoted_by: string (nullable = true)
 |-- promoted_display_name: string (nullable = true)
 |-- promoted_url: string (nullable = true)
 |-- retrieved_on: timestamp (nullable = true)
 |-- score: long (nullable = true)
 |-- secure_media: struct (nullable = true)
 |    |-- event_id: string (nullable = true)
 |    |-- oembed: struct (nullable = true)
 |    |    |-- author_name: string (nullable = true)
 |    |    |-- author_url: string (nullable = true)
 |    |    |-- cache_age: long (nullable = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- height: long (nullable = true)
 |    |    |-- html: string (nullable = true)
 |    |    |-- provider_name: string (nullable = true)
 |    |    |-- provider_url: string (nullable = true)
 |    |    |-- thumbnail_height: long (nullable = true)
 |    |    |-- thumbnail_url: string (nullable = true)
 |    |    |-- thumbnail_width: long (nullable = true)
 |    |    |-- title: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- url: string (nullable = true)
 |    |    |-- version: string (nullable = true)
 |    |    |-- width: long (nullable = true)
 |    |-- type: string (nullable = true)
 |-- secure_media_embed: struct (nullable = true)
 |    |-- content: string (nullable = true)
 |    |-- height: long (nullable = true)
 |    |-- media_domain_url: string (nullable = true)
 |    |-- scrolling: boolean (nullable = true)
 |    |-- width: long (nullable = true)
 |-- selftext: string (nullable = true)
 |-- spoiler: boolean (nullable = true)
 |-- stickied: boolean (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)
 |-- suggested_sort: string (nullable = true)
 |-- third_party_trackers: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- third_party_tracking: string (nullable = true)
 |-- third_party_tracking_2: string (nullable = true)
 |-- thumbnail: string (nullable = true)
 |-- thumbnail_height: long (nullable = true)
 |-- thumbnail_width: long (nullable = true)
 |-- title: string (nullable = true)
 |-- url: string (nullable = true)
 |-- whitelist_status: string (nullable = true)
# display a subset of columns
submissions.select("subreddit", "author", "title", "selftext", "created_utc", "num_comments").show()
+----------+--------------------+--------------------+---------+-------------------+------------+
| subreddit|              author|               title| selftext|        created_utc|num_comments|
+----------+--------------------+--------------------+---------+-------------------+------------+
|      news|           [deleted]|New Warp Drive Mo...|[deleted]|2021-03-30 10:52:37|           0|
|      news|First-Situation-1384|UNICAL CES admiss...|         |2021-03-30 10:54:26|           0|
|      news|             bustead|Attack on Asian W...|         |2021-03-30 10:55:12|           0|
|      news|     Som2ny-Official|'Nomadland' wins ...|         |2021-03-30 10:55:50|           0|
|      news|           pm30music|دانلود آهنگ کردی ...|         |2021-03-30 10:56:28|           0|
|      news|    Anon-fickleflake|Ireland to promot...|         |2021-03-30 10:57:10|           0|
|      news|First-Situation-1384|Newspaper Mistake...|         |2021-03-30 10:57:29|           0|
|      news| DPRK_JUCHE_IDEOLOGY|River Improvement...|         |2021-03-30 10:57:30|           0|
|      news|       Benjamin-Info|HOW TO SELL WITH ...|         |2021-03-30 10:57:34|           0|
|      news|        sportifynews|PayPal will let U...|         |2021-03-30 10:58:31|           0|
|      news|  Powerful-Flow-5496|      women's boxers|         |2021-03-30 10:59:02|           0|
|      news|     ana_news_agency|علاقة وثيقة بين ط...|         |2021-03-30 11:00:02|           0|
|      news|   rainbowarriorhere|8.2 TB Of MobiKwi...|         |2021-03-30 11:00:37|           0|
|      news|First-Situation-1384|BBnaija star, Ifu...|         |2021-03-30 11:00:45|           0|
|      news|           [deleted]|MobiKwik data bre...|[deleted]|2021-03-30 11:00:55|           0|
|      news|     Som2ny-Official|  Women's March 2020|         |2021-03-30 11:01:20|           0|
|      news|   rainbowarriorhere|Mobikwik denies d...|         |2021-03-30 11:01:21|           6|
|technology|   rainbowarriorhere|Mobikwik denies d...|         |2021-03-30 11:01:40|           0|
|      news| Right-Bathroom-5287|Adani denies enga...|         |2021-03-30 11:02:08|           0|
|      news|    popularnewsindia|Top 5 Luxury Car ...|         |2021-03-30 11:04:20|           0|
+----------+--------------------+--------------------+---------+-------------------+------------+
only showing top 20 rows