Ungraded Lab (Optional): ETL Pipelines and Batch Predictions with Apache Beam and Tensorflow

In this lab, you will create, train, evaluate, and make predictions on a model using Apache Beam and TensorFlow. In particular, you will train a model to predict the molecular energy based on the number of carbon, hydrogen, oxygen, and nitrogen atoms.

This lab is marked as optional because you will not be interacting with Beam-based systems directly in future exercises. Other courses of this specialization also use tools that abstract this layer. Nonetheless, it would be good to be familiar with it since it is used under the hood by TFX which is the main ML pipelines framework that you will use in other labs. Seeing how these systems work will let you explore other codebases that use this tool more freely and even make contributions or bug fixes as you see fit. If you don't know the basics of Beam yet, we encourage you to look at the Minimal Word Count example here for a quick start and use the Beam Programming Guide to look up concepts if needed.

The entire pipeline can be divided into four phases:

  1. Data extraction
  2. Preprocessing the data
  3. Training the model
  4. Doing predictions

You will focus particularly on Phase 2 (Preprocessing) and a bit of Phase 4 (Predictions) because these use Beam in its implementation.

Let's begin!

Note: This tutorial uses code, images, and discussion from this article. We highlighted a few key parts and updated some of the code to use more recent versions. Also, we focused on making the lab running locally. The original article linked above contain instructions on running it in GCP. Just take note that it will have associated costs depending on the resources you use.

Initial setup

You will first download the scripts that you will use in the lab.

The molecules directory you downloaded mainly contain 4 scripts that encapsulate all phases of the workflow you will execute in this lab. It is summarized by the figure below:

https://github.com/https-deeplearning-ai/machine-learning-engineering-for-production-public/raw/main/course4/week2-ungraded-labs/C4_W2_Lab_4_ETL_Beam/images/overview.png

In addition, it also contains these additional files and directories:

Note: In Google Colab, you need to restart the runtime at this point to finalize updating the packages you just installed. You can do so by clicking the Restart Runtime button at the end of the output cell above (after installation), or by selecting Runtime > Restart Runtime in the Menu bar. Please do not proceed to the next section without restarting.

Next, you'll define the working directory to contain all the results you will generate in this exercise. After each phase, you can open the Colab file explorer on the left and look under the results directory to see the new files and directories generated.

With that, you are now ready to execute the pipeline. As shown in the figure earlier, the logic is already implemented in the four main scripts. You will run them one by one and the next sections will discuss relevant detail and the outputs generated.

Phase 1: Data extraction

The first step is to extract the input data. The dataset is stored as SDF files and is extracted from the National Center for Biotechnology Information (FTP source). Chapter 6 of this document shows a more detailed description of the SDF file format.

The data-extractor.py file extracts and decompresses the specified SDF files. In later steps, the example preprocesses these files and uses the data to train and evaluate the machine learning model. The file extracts the SDF files from the public source and stores them in a subdirectory inside the specified working directory.

As you can see here, the complete set of files is huge and can easily exceed storage limits in Colab. For this exercise, you will just download one file. You can use the script as shown in the cells below:

You should now have a new folder in your work directory called data. This will contain the SDF file you downloaded.

In the SDF Documentation linked earlier, it shows that one record is terminated by $$$$. You can use the command below to print the first one in the file. As you'll see, just one record is already pretty long. In the next phase, you'll feed these records in a pipeline that will transform these into a form that can be consumed by our model.

Phase 2: Preprocessing

The next script: preprocess.py uses an Apache Beam pipeline to preprocess the data. The pipeline performs the following preprocessing actions:

  1. Reads and parses the extracted SDF files.
  2. Counts the number of different atoms in each of the molecules in the files.
  3. Normalizes the counts to values between 0 and 1 using tf.Transform.
  4. Partitions the dataset into a training dataset and an evaluation dataset.
  5. Writes the two datasets as TFRecord objects.

Apache Beam transforms can efficiently manipulate single elements at a time, but transforms that require a full pass of the dataset cannot easily be done with only Apache Beam and are better done using tf.Transform. Because of this, the code uses Apache Beam transforms to read and format the molecules, and to count the atoms in each molecule. The code then uses tf.Transform to find the global minimum and maximum counts in order to normalize the data.

The following image shows the steps in the pipeline.

https://github.com/https-deeplearning-ai/machine-learning-engineering-for-production-public/raw/main/course4/week2-ungraded-labs/C4_W2_Lab_4_ETL_Beam/images/etl.png

Run the preprocessing pipeline

You will run the script first and the following sections will discuss the relevant parts of this code. This will take around 6 minutes to run.

You should now have a few more outputs in your work directory. Most important are:

The training and evaluation datasets contain TFRecords and you can view them by running the helper function in the cells below.

Note: From the output cell above, you might concur that we'll need more than the atom counts to make better predictions. You'll notice that the counts are identical in both records but the Energy value is different. Thus, you cannot expect the model to have a low loss during the training phase later. For simplicity, we'll just use atom counts in this exercise but feel free to revise later to have more predictive features. You can share your findings in our Discourse community to discuss with other learners who are interested in the same problem.

The PreprocessData contains Python objects needed in the training phase such as:

These are saved in a serialized file using dill when you ran the preprocess script earlier and you can deserialize it using the cell below to view its contents.

The next sections will describe how these are implemented as a Beam pipeline in preprocess.py. You can open this file in a separate text editor so you can look at it more closely while reviewing the snippets below.

Applying element-based transforms

The preprocess.py code creates an Apache Beam pipeline.

Click here to see the code snippet ``` # Build and run a Beam Pipeline with beam.Pipeline(options=beam_options) as p, \ beam_impl.Context(temp_dir=tft_temp_dir): ```

Next, the code applies a `feature_extraction` transform to the pipeline.
Click here to see the code snippet ``` # Transform and validate the input data matches the input schema dataset = ( p | 'Feature extraction' >> feature_extraction ```

The pipeline uses `SimpleFeatureExtraction` as its `feature_extraction` transform.
Click here to see the code snippet ``` pubchem.SimpleFeatureExtraction(pubchem.ParseSDF(data_files_pattern)), ```

The `SimpleFeatureExtraction` transform, defined in `pubchem/pipeline.py`, contains a series of transforms that manipulate all elements independently. First, the code parses the molecules from the source file, then formats the molecules to a dictionary of molecule properties, and finally, counts the atoms in the molecule. These counts are the features (inputs) for the machine learning model.
Click here to see the code snippet ``` class SimpleFeatureExtraction(beam.PTransform): """The feature extraction (element-wise transformations). We create a `PTransform` class. This `PTransform` is a bundle of transformations that can be applied to any other pipeline as a step. We'll extract all the raw features here. Due to the nature of `PTransform`s, we can only do element-wise transformations here. Anything that requires a full-pass of the data (such as feature scaling) has to be done with tf.Transform. """ def __init__(self, source): super(SimpleFeatureExtraction, self).__init__() self.source = source def expand(self, p): # Return the preprocessing pipeline. In this case we're reading the PubChem # files, but the source could be any Apache Beam source. return (p | 'Read raw molecules' >> self.source | 'Format molecule' >> beam.ParDo(FormatMolecule()) | 'Count atoms' >> beam.ParDo(CountAtoms()) ) ```

The read transform `beam.io.Read(pubchem.ParseSDF(data_files_pattern))` reads SDF files from a custom source. The custom source, called `ParseSDF`, is defined in `pubchem/pipeline.py`. ParseSDF extends `FileBasedSource` and implements the `read_records` function that opens the extracted SDF files. The pipeline groups the raw data into sections of relevant information needed for the next steps. Each section in the parsed SDF file is stored in a dictionary (see `pipeline/sdf.py`), where the keys are the section names and the values are the raw line contents of the corresponding section. The code applies `beam.ParDo(FormatMolecule())` to the pipeline. The `ParDo` applies the `DoFn` named `FormatMolecule` to each molecule. `FormatMolecule` yields a dictionary of formatted molecules. The following snippet is an example of an element in the output `PCollection`:
Click here to see a sample output of beam.ParDo(FormatMolecule()) ``` { 'atoms': [ { 'atom_atom_mapping_number': 0, 'atom_stereo_parity': 0, 'atom_symbol': u'O', 'charge': 0, 'exact_change_flag': 0, 'h0_designator': 0, 'hydrogen_count': 0, 'inversion_retention': 0, 'mass_difference': 0, 'stereo_care_box': 0, 'valence': 0, 'x': -0.0782, 'y': -1.5651, 'z': 1.3894, }, ... ], 'bonds': [ { 'bond_stereo': 0, 'bond_topology': 0, 'bond_type': 1, 'first_atom_number': 1, 'reacting_center_status': 0, 'second_atom_number': 5, }, ... ], '': ['3\n'], ... '': ['19.4085\n'], ... } ```

Then, the code applies `beam.ParDo(CountAtoms())` to the pipeline. The `DoFn` `CountAtoms` sums the number of carbon, hydrogen, nitrogen, and oxygen atoms each molecule has. `CountAtoms` outputs a `PCollection` of features and labels. Here is an example of an element in the output `PCollection`:
Click here to see a sample output of beam.ParDo(CountAtoms()) ``` { 'ID': 3, 'TotalC': 7, 'TotalH': 8, 'TotalO': 4, 'TotalN': 0, 'Energy': 19.4085, } ```

The pipeline then validates the inputs. The `ValidateInputData` `DoFn` validates that every element matches the metadata given in the `input_schema`. This validation ensures that the data is in the correct format when it's fed into TensorFlow.
Click here to see the code snippet ``` | 'Validate inputs' >> beam.ParDo(ValidateInputData( input_feature_spec))) ```

Applying full-pass transforms

The Molecules code sample uses a Deep Neural Network Regressor to make predictions. The general recommendation is to normalize the inputs before feeding them into the ML model. The pipeline uses tf.Transform to normalize the counts of each atom to values between 0 and 1. To read more about normalizing inputs, see feature scaling.

Normalizing the values requires a full pass through the dataset, recording the minimum and maximum values. The code uses tf.Transform to go through the entire dataset and apply full-pass transforms.

To use tf.Transform, the code must provide a function that contains the logic of the transform to perform on the dataset. In preprocess.py, the code uses the AnalyzeAndTransformDataset transform provided by tf.Transform. Learn more about how to use tf.Transform.

Click here to see the code snippet
# Apply the tf.Transform preprocessing_fn
input_metadata = dataset_metadata.DatasetMetadata(
    dataset_schema.from_feature_spec(input_feature_spec))

dataset_and_metadata, transform_fn = (
    (dataset, input_metadata)
    | 'Feature scaling' >> beam_impl.AnalyzeAndTransformDataset(
        feature_scaling))
dataset, metadata = dataset_and_metadata

</details>

In preprocess.py, the feature_scaling function used is normalize_inputs, which is defined in pubchem/pipeline.py. The function uses the tf.Transform function scale_to_0_1 to normalize the counts to values between 0 and 1.

Click here to see the code snippet
def normalize_inputs(inputs):
  """Preprocessing function for tf.Transform (full-pass transformations).

  Here we will do any preprocessing that requires a full-pass of the dataset.
  It takes as inputs the preprocessed data from the `PTransform` we specify, in
  this case `SimpleFeatureExtraction`.

  Common operations might be scaling values to 0-1, getting the minimum or
  maximum value of a certain field, creating a vocabulary for a string field.

  There are two main types of transformations supported by tf.Transform, for
  more information, check the following modules:
    - analyzers: tensorflow_transform.analyzers.py
    - mappers:   tensorflow_transform.mappers.py

  Any transformation done in tf.Transform will be embedded into the TensorFlow
  model itself.
  """
  return {
      # Scale the input features for normalization
      'NormalizedC': tft.scale_to_0_1(inputs['TotalC']),
      'NormalizedH': tft.scale_to_0_1(inputs['TotalH']),
      'NormalizedO': tft.scale_to_0_1(inputs['TotalO']),
      'NormalizedN': tft.scale_to_0_1(inputs['TotalN']),

      # Do not scale the label since we want the absolute number for prediction
      'Energy': inputs['Energy'],
  }

</details>

Partitioning the dataset

Next, the preprocess.py pipeline partitions the single dataset into two datasets. It allocates approximately 80% of the data to be used as training data, and approximately 20% of the data to be used as evaluation data.

Click here to see the code snippet
# Split the dataset into a training set and an evaluation set
assert 0 < eval_percent < 100, 'eval_percent must in the range (0-100)'
train_dataset, eval_dataset = (
    dataset
    | 'Split dataset' >> beam.Partition(
        lambda elem, _: int(random.uniform(0, 100) < eval_percent), 2))

</details>

Writing the output

Finally, the preprocess.py pipeline writes the two datasets (training and evaluation) using the WriteToTFRecord transform.

Click here to see the code snippet
# Write the datasets as TFRecords
coder = example_proto_coder.ExampleProtoCoder(metadata.schema)

train_dataset_prefix = os.path.join(train_dataset_dir, 'part')
_ = (
    train_dataset
    | 'Write train dataset' >> tfrecordio.WriteToTFRecord(
        train_dataset_prefix, coder))

eval_dataset_prefix = os.path.join(eval_dataset_dir, 'part')
_ = (
    eval_dataset
    | 'Write eval dataset' >> tfrecordio.WriteToTFRecord(
        eval_dataset_prefix, coder))

# Write the transform_fn
_ = (
    transform_fn
    | 'Write transformFn' >> transform_fn_io.WriteTransformFn(work_dir))

</details>

Phase 3: Training

Recall that at the end of the preprocessing phase, the code split the data into two datasets (training and evaluation).

The script uses a simple dense neural network for the regression problem. The trainer/task.py file contains the code for training the model. The main function of trainer/task.py loads the parameters needed from the preprocessing phase and passes it to the task runner function (i.e. run_fn).

In this exercise, we will not focus too much on the training metrics (e.g. accuracy). That is discussed in other courses of this specialization. The main objective is to look at the outputs and how it is connected to the prediction phase.

The outputs of this phase are in the model directory. This will be the trained model that you will use for predictions.

The important thing to note in the training script is it also exports the transformation graph with the model. That is shown in these lines:

Click here to see the code snippet ``` # Define default serving signature signatures = { 'serving_default': _get_serve_tf_examples_fn(model, tf_transform_output, input_feature_spec).get_concrete_function( [signatures_dict]) } # Save model with signature model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures, include_optimizer=False) ```

The implementation of _get_serve_tf_examples_fn() is as follows:

Click here to see the code snippet ``` def _get_serve_tf_examples_fn(model, tf_transform_output, feature_spec): """Returns a function that applies data transformation and generates predictions""" # Get transformation graph model.tft_layer = tf_transform_output.transform_features_layer() @tf.function def serve_tf_examples_fn(inputs_list): """Returns the output to be used in the serving signature.""" # Create a shallow copy of the dictionary in the single element list inputs = inputs_list[0].copy() # Pop ID since it is not needed in the transformation graph # Also needed to identify predictions id_key = inputs.pop('ID') # Apply data transformation to the raw inputs transformed = model.tft_layer(inputs) # Pass the transformed data to the model to get predictions predictions = model(transformed.values()) return id_key, predictions return serve_tf_examples_fn ```

The use of model.tft_layer means that your model can accept raw data and it will do the transformation before feeding it to make predictions. It implies that when you serve your model for predictions, you don't have to worry about creating a pipeline to transform new data coming in. The model will already do that for you through this serving input function. It helps to prevent training-serving skew since you're handling the training and serving data the same way.

Phase 4: Prediction

After training the model, you can provide the model with inputs and it will make predictions. The pipeline in predict.py is responsible for making predictions. It reads the input files from the custom source and writes the output predictions as text files to the specified working directory.

Click here to see the code snippet ``` if args.verb == 'batch': data_files_pattern = os.path.join(args.inputs_dir, '*.sdf') results_prefix = os.path.join(args.outputs_dir, 'part') source = pubchem.ParseSDF(data_files_pattern) sink = beam.io.WriteToText(results_prefix) ```

The following image shows the steps in the prediction pipeline: https://github.com/https-deeplearning-ai/machine-learning-engineering-for-production-public/raw/main/course4/week2-ungraded-labs/C4_W2_Lab_4_ETL_Beam/images/predict.png In `predict.py`, the code defines the pipeline in the run function:
Click here to see the code snippet ``` def run(model_dir, feature_extraction, sink, beam_options=None): print('Listening...') with beam.Pipeline(options=beam_options) as p: _ = (p | 'Feature extraction' >> feature_extraction | 'Predict' >> beam.ParDo(Predict(model_dir, 'ID')) | 'Format as JSON' >> beam.Map(json.dumps) | 'Write predictions' >> sink) ```

The code calls the run function with the following parameters:
Click here to see the code snippet ``` run( args.model_dir, pubchem.SimpleFeatureExtraction(source), sink, beam_options) ```

First, the code passes the pubchem.SimpleFeatureExtraction(source) transform as the feature_extraction transform. This transform, which was also used in the preprocessing phase, is applied to the pipeline:

Click here to see the code snippet ``` class SimpleFeatureExtraction(beam.PTransform): """The feature extraction (element-wise transformations). We create a `PTransform` class. This `PTransform` is a bundle of transformations that can be applied to any other pipeline as a step. We'll extract all the raw features here. Due to the nature of `PTransform`s, we can only do element-wise transformations here. Anything that requires a full-pass of the data (such as feature scaling) has to be done with tf.Transform. """ def __init__(self, source): super(SimpleFeatureExtraction, self).__init__() self.source = source def expand(self, p): # Return the preprocessing pipeline. In this case we're reading the PubChem # files, but the source could be any Apache Beam source. return (p | 'Read raw molecules' >> self.source | 'Format molecule' >> beam.ParDo(FormatMolecule()) | 'Count atoms' >> beam.ParDo(CountAtoms()) ) ```

The transform reads from the appropriate source based on the pipeline’s execution mode (i.e. batch), formats the molecules, and counts the different atoms in each molecule.

Next, beam.ParDo(Predict(…)) is applied to the pipeline that performs the prediction of the molecular energy. Predict, the DoFn that's passed, uses the given dictionary of input features (atom counts), to predict the molecular energy.

The next transform applied to the pipeline is beam.Map(lambda result: json.dumps(result)), which takes the prediction result dictionary and serializes it into a JSON string. Finally, the output is written to the sink.

Batch predictions

Batch predictions are optimized for throughput rather than latency. Batch predictions work best if you're making many predictions and you can wait for all of them to finish before getting the results. You can run the following cells to use the script to run batch predictions. For simplicity, you will use the same file you used for training. If you want however, you can use the data extractor script earlier to grab a different SDF file and feed it here.

The results should now be in the predictions folder. This is just a text file so you can easily print the output.

Wrap Up

You've now completed all phases of the Beam-based pipeline! Similar processes are done under the hood by higher-level frameworks such as TFX and you can use the techniques here to understand their codebase better or to extend them for your own needs. As mentioned earlier, the original article also offers the option to use GCP and to perform online predictions as well. Feel free to try it out but be aware of the recurring costs.

On to the next part of the course!