Skip to content

Step 1 - Preprocessing

Inspecting the data

The dataset can be found here. I’m going to go with the ratings, metadata and keywords for this classifier. The columns that I’m interested in and will work with are:

  • ratings.csv — all of them
  • metadata.csv — [id, budget, genres, popularity, runtime, revenue, original_language, production_companies, vote_count, vote_average]
  • keywords.csv — all of them

Since these are the columns with least null values and most information. I’m not going to do any Pandas data analysis here, because Kaggle has already done it for me. Personally, I keep all my cool datasets on S3, so importing them on the MLOps platform is easy, just add the S3 paths under “Create Datasource”. It should look something like this:

Datasource

Let’s get crackin’

First things first. This is my __main__.

if __name__ == "__main__":
    mlops = SparkProcessor()

    df_ratings = mlops.read(
        database_name='mlops', table_name='datasource_4ef07c20-24c6-4db1-87e9-93bfef3b545c_ratings_csv').coalesce(2)
    df_keywords = mlops.read(
        database_name='mlops', table_name='datasource_7a250c2f-f720-4faa-b928-de5134f066af_keywords_csv').coalesce(2)
    df_meta = mlops.read(
        database_name='mlops', table_name='datasource_76d6c4bb-ca52-4866-91b0-22a8487c7162_movies_metadata_csv').coalesce(2)

    result_df = my_transformations(df_keywords, df_ratings, df_meta)
    label_cols = result_df.schema.names[-4:]

    mlops.write(
        result_df,
        label_columns=label_cols,
        coalesce=False,
        output_format="parquet"
    )

Importing the DataFrames might seem a bit obscure, but since I generated the code from the console, I get the UUID names of the version tracked tables. After reading the data we basically call a function that does all our transformations, and then we write the resulting DataFrame to disk, specifying format and label columns (for statistics and training). Using the MLOps read and write functions also automatically allows you to trigger this job on a schedule, and it will only process data it has not yet seen (pipeline gold).

CreateDataset

Note

Saving this dataset in Parquet instead of CSV, results in about two magnitudes of disk space saved. Initial dataset size: ~700 Mb, transformed CSV: ~900 Mb, transformed Parquet: 14 Mb.

Oh btw, don’t forget to add some imports, it might help.

from pyspark.sql import functions as f
from pyspark.sql import DataFrame, SparkSession
from pyspark.ml.feature import Imputer, StringIndexer, VectorAssembler, MaxAbsScaler, OneHotEncoder, StandardScaler
from pyspark.ml import Pipeline
from pyspark.sql.types import (
    ArrayType,
    IntegerType,
    DoubleType
)
import sys
import random
import json
from mlops.processing.spark import SparkProcessor

Transformations

So to summarize what I want to accomplish with the data: 1. Get the IDs from all the nested columns [genres, keywords, production_companies] and stringify them so that, for example, a combination of genre_1 + genre_2 can be seen as a feature. 2. Convert all string columns [original_language, + columns from (1)] to numerical classes. 3. Normalize and scale all feature columns. 4. One hot encode my ratings (labels), since I will be using TensorFlow’s CategoricalCrossentropy as loss function. This is optional, and if you are using SparkML, or Scikit, you could just be satisfied with having a single column of integers instead.

Parsing dict columns

So let’s start with the boring, annoying part of figuring out how to parse these poorly structured python dicts (they could at least have bothered doing a json.dumps()). So this is my oh-my-god helper function:

def parse_json_array(data: str):
    if data is not None:
        data_cleaned = data.replace(
            "None", "null").replace("'", '"').replace('\r', '').replace('\n', '').replace('""', '"').replace('\xa0', '')
        try:
            data_parsed = json.loads(data_cleaned)
            data = []
            for item in data_parsed:
                data.append(item['id'])
            return data
        except Exception as e:
            return []
    else:
        return []


def to_array(col):
    def to_array_(v):
        return v.toArray().tolist()
    return f.udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)

So now that we got that out of the way, let’s dive into the my_transformations() function. I’m going to start with the parsing. And basically, what I will do is run a UDF for each json column and pass each row to the above function. Basically we will go from having a stringified Python dict to a Spark ArrayType filled with integers (the Id’s):

def my_transformations(df_keywords: DataFrame, df_ratings: DataFrame, df_meta: DataFrame) -> DataFrame:
    json_array_schema = ArrayType(IntegerType())
    udf_id_parser = f.udf(lambda x: parse_json_array(x), json_array_schema)

    df_kw_parsed = df_keywords.select(
        [
            f.col('id').cast('int'),
            udf_id_parser(df_kw_parsed["keywords"]).alias("keyword_ids")
        ]
    ).withColumn('size', f.size(f.col('keyword_ids'))).filter(f.col('size') >= 1)

    df_meta_parsed = df_meta.select(
        [
            f.col('id').cast('int'),
            'original_language',
            'popularity',
            'runtime',
            'revenue',
            'vote_average',
            'vote_count',
            udf_id_parser(df_meta['production_companies']).alias(
                'production_company_ids'),
            f.col('budget').cast('int'),
            udf_id_parser(df_meta["genres"]).alias("genre_ids")

        ]
    ).withColumn('size', f.size(f.col('genre_ids'))).filter(f.col('size') >= 1)

    df_ratings = df_ratings.select(
        f.col('movieid').alias('id'),
        f.col('rating').cast('float'),
        f.col('timestamp').cast('int')
    ).na.drop()

As you can see above I’m also using my own special “null value” handler at the end of df_kw_parsed and df_meta_parsed to remove all arrays that are empty. Next up, I will join my three DataFrames together, repartition the data evenly onto the Spark executors and then cast a bunch of columns. You will also notice that I use the Spark built-in function array_join , which will help me stringify my arrays so that I can create integer classes out of them later.

# Continuing inside my_transformations()

df_joined = df_ratings.join(
    f.broadcast(df_meta_parsed.join(
        df_kw_parsed,
        on='id',
        how='left'
    )),
    on='id',
    how='inner'
).repartition(20)

df_joined = df_joined.select(
    'id',
    'rating',
    'original_language',
    'budget',
    'timestamp',
    f.col('popularity').cast('float'),
    f.col('vote_count').cast('float'),
    f.col('vote_average').cast('float'),
    f.col('runtime').cast('float'),
    f.col('revenue').cast('float'),
    f.array_join(df_joined['production_company_ids'],
                 delimiter='_').alias('production_company_join'),
    f.array_join(df_joined['genre_ids'],
                 delimiter='_').alias('genre_join'),
    f.array_join(df_joined['keyword_ids'], delimiter='_').alias('kw_join')
)

Once that hurdle is over, I will create my first Spark Pipeline that will take all my string columns and turn each unique value into its own integer value. I will also take my ratings column and use Bucketizer . Basically, since the ratings are a bit skewed in number of samples, I will this function to bucket ratings into a unified class, reducing the number of classes from 10 to 4.

# Continuing inside my_transformations()

index_pipeline = Pipeline(stages=[
    StringIndexer(inputCol='genre_join',
                  outputCol='genreIndex').setHandleInvalid('skip'),
    StringIndexer(inputCol='production_company_join',
                  outputCol='pcIndex').setHandleInvalid('skip'),
    StringIndexer(inputCol='kw_join',
                  outputCol='kwIndex').setHandleInvalid('skip'),
    StringIndexer(inputCol='original_language',
                  outputCol='langIndex').setHandleInvalid('skip'),
    Bucketizer(splits=[0.0, 2.5, 3.5, 4.5, 5.0], inputCol='rating',
               outputCol='ratingIndex').setHandleInvalid('skip')
])
df_indexed = index_pipeline.fit(df_joined).transform(df_joined).select(
    'budget', 'popularity', 'timestamp', 'vote_count', 'vote_average', 'runtime', 'revenue', 'pcIndex', 'genreIndex', 'kwIndex', 'langIndex', 'ratingIndex'
)

Alright, we have two things left to do, scaling and one hot encoding. So let’s start with scaling, which I will do for for all features. I will use the Spark StandardScaler that normalizes and then scales, because I’m lazy. And honestly, because we are doing movie ratings.

# Continuing inside my_transformations()

unlist = f.udf(lambda x: round(float(list(x)[0]), 3), DoubleType())

# Iterating over columns to be scaled
for i in ['budget', 'popularity', 'timestamp', 'vote_count', 'vote_average', 'runtime', 'revenue', 'pcIndex', "genreIndex", "kwIndex", 'langIndex']:
    assembler = VectorAssembler(inputCols=[i], outputCol=i+"_vect")
    assembler.setHandleInvalid('skip')
    scaler = StandardScaler(
        inputCol=i+"_vect", outputCol=i+"_scaled", withMean=True
    )
    pipeline = Pipeline(stages=[assembler, scaler])
    df_indexed = pipeline.fit(df_indexed).transform(df_indexed).withColumn(
        i+"_scaled", unlist(i+"_scaled")
    ).drop(i+"_vect")

Finally, we can run our last pipeline, with the OneHotEncoder . There is just one little obstacle here, and that is that it’s outputted into a SparseVector and we want it as columns, since we are going to save it to disk.

# Continuing inside my_transformations()

encode_pipeline = Pipeline(stages=[
    OneHotEncoder(inputCol='ratingIndex', outputCol='rating_hot')
])
df_encoded = encode_pipeline.fit(df_indexed).transform(df_indexed)

return df_encoded.withColumn("label", to_array(f.col("rating_hot"))).select(
    ['budget_scaled',
     'timestamp_scaled',
     'vote_count_scaled',
     'vote_average_scaled',
     'popularity_scaled',
     'runtime_scaled',
     'revenue_scaled',
     'pcIndex_scaled',
     'genreIndex_scaled',
     'kwIndex_scaled',
     'langIndex_scaled',
     'ratingIndex'] + [f.col("label")[i] for i in range(4)]
)

Notice that I use the function to_array from gist number three. Finally I will return to main and write to disk. Now that my script is done, I want to pop it into the MLOps platform so I do “Create Dataset”, choose 4 standard workers (that’s a lot of compute power for this little princess dataset). I will create a full subset right away and do a 80/10/10 split on train/val/test. I also won’t calculate any column metrics, since I know they are normalized and scaled, and also because we are doing movie ratings. Here is the result:

Datasets

This badboy took us about 35 minutes to execute and costed me about a dollar. Good for me that I’m 100 % certain this will generate a kick-ass classifier in the next part! If I’m interested, I can directly via the Logs button inspect the CloudWatch logs. I can also check out the PySpark Code directly, which is nice if you are multiple people on a team working towards a common goal. You can then easily start of where someone left.

Code

Well that was a handful. Next, we move on to training some models to see if we can accomplish a fantastic ratings classifier.