Preprocessing

class mlops.processing.spark.SparkProcessor()

Main object for running PySpark processing jobs in MLOps.

read(self, database_name, table_name)

Read data from catalog that has been created by MLOps. Should generally be used as generated by the console, as table names are UUID strings.

Arguments:

database_name: str - the name of the Glue Database
table_name: str - the table name inside the Glue Database

Example:

mlops = SparkProcessor()
# Read in the first datasource selected in the console
df = mlops.read(mlops.databases[0], mlops.tables[0])
read_undiscovered(self, paths, data_format, compression=None, recursive=True, header=True, separator=',', skip_first=False, quote_char='"', avro_version='1.8', json_path=None)

Read data residing on S3 into a Spark DataFrame.

Arguments:

paths: list - a list of S3 paths to objects.
data_format: str - possible values are ["csv", "json", "parquet"]
compression: str - the compression type of the files ['snappy', 'gzip']
recursive: bool - whether or not to search files recursively under path
header: bool - if the records has headers (CSV only)
separator: str - the separator for the records (CSV only)
skip_first: bool - whether to skip the first row or not (CSV only)
quote_char: str - the quote char in the records (CSV only)
avro_version: str - avro schema version, possible values ["1.7", "1.8"]
json_path: str - the object in the json records to be read (if None, reads all)

Example:

mlops = SparkProcessor()
df = mlops.read_undiscovered(paths=['s3://my_bucket/my_path'], data_format='csv', header=True)
write(self, df, label_columns, coalesce=False, partitions=None, output_format='parquet', header=True)

Write DataFrame to S3.

Arguments:

df: DataFrame - the Spark DataFrame to save.
label_columns: list[str] - list with names of the labels columns.
coalesce: bool - if set to True, Spark will save all data into one file (takes a lot of time and might result in OoM).
partitions: list[str] - if not None, Spark will save the data partitioned by these columns.
output_format: str - the data format to save the data in. Valid values are ["csv", "parquet"].
header: bool - if you want the output data to have headers (CSV only).

Example:

mlops = SparkProcessor()
df = mlops.read()
df_result = my_transformations(df)
mlops.write(df_result, label_columns=['my_label_column'], coalesce=False, partitions=['month', 'day', 'hour'], output_format='parquet')
class mlops.processing.python.PythonProcessor()

Main object for running Scikit/Pandas processing jobs in MLOps.

parse_result(self, qry_id)
read(self, database, table, qry=None)
read_undiscovered(self, paths, data_format, compression=None, header=True, separator=',', skip_first=False, quote_char='"')

Read data residing on S3 into a Spark DataFrame.

Arguments:

paths: list - a list of S3 paths to objects.
data_format: str - possible values are ["csv", "json", "parquet"]
compression: str - the compression type of the files ['snappy', 'gzip']
header: bool - if the records has headers (CSV only)
separator: str - the separator for the records (CSV only)
skip_first: bool - whether to skip the first row or not (CSV only)
quote_char: str - the quote char in the records (CSV only)

Example: mlops = PythonProcessor() df = mlops.read_undiscovered(paths=['s3://my_bucket/my_path'], data_format='csv', header=True)

write(self, df, label_columns, header=True, output_format='csv')