In this lab, you will create, train, evaluate, and make predictions on a model using Apache Beam and TensorFlow. In particular, you will train a model to predict the molecular energy based on the number of carbon, hydrogen, oxygen, and nitrogen atoms.
This lab is marked as optional because you will not be interacting with Beam-based systems directly in future exercises. Other courses of this specialization also use tools that abstract this layer. Nonetheless, it would be good to be familiar with it since it is used under the hood by TFX which is the main ML pipelines framework that you will use in other labs. Seeing how these systems work will let you explore other codebases that use this tool more freely and even make contributions or bug fixes as you see fit. If you don't know the basics of Beam yet, we encourage you to look at the Minimal Word Count example here for a quick start and use the Beam Programming Guide to look up concepts if needed.
The entire pipeline can be divided into four phases:
You will focus particularly on Phase 2 (Preprocessing) and a bit of Phase 4 (Predictions) because these use Beam in its implementation.
Let's begin!
Note: This tutorial uses code, images, and discussion from this article. We highlighted a few key parts and updated some of the code to use more recent versions. Also, we focused on making the lab running locally. The original article linked above contain instructions on running it in GCP. Just take note that it will have associated costs depending on the resources you use.
You will first download the scripts that you will use in the lab.
# Download the scripts
!wget https://github.com/https-deeplearning-ai/machine-learning-engineering-for-production-public/raw/main/course4/week2-ungraded-labs/C4_W2_Lab_4_ETL_Beam/data/molecules.tar.gz
# Unzip the archive
!tar -xvzf molecules.tar.gz
--2022-09-05 03:50:10-- https://github.com/https-deeplearning-ai/machine-learning-engineering-for-production-public/raw/main/course4/week2-ungraded-labs/C4_W2_Lab_4_ETL_Beam/data/molecules.tar.gz Resolving github.com (github.com)... 140.82.121.3 Connecting to github.com (github.com)|140.82.121.3|:443... connected. HTTP request sent, awaiting response... 302 Found Location: https://raw.githubusercontent.com/https-deeplearning-ai/machine-learning-engineering-for-production-public/main/course4/week2-ungraded-labs/C4_W2_Lab_4_ETL_Beam/data/molecules.tar.gz [following] --2022-09-05 03:50:11-- https://raw.githubusercontent.com/https-deeplearning-ai/machine-learning-engineering-for-production-public/main/course4/week2-ungraded-labs/C4_W2_Lab_4_ETL_Beam/data/molecules.tar.gz Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ... Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected. HTTP request sent, awaiting response... 200 OK Length: 10651 (10K) [application/octet-stream] Saving to: ‘molecules.tar.gz’ molecules.tar.gz 100%[===================>] 10.40K --.-KB/s in 0s 2022-09-05 03:50:11 (65.7 MB/s) - ‘molecules.tar.gz’ saved [10651/10651] molecules/ molecules/requirements.txt molecules/pubchem/ molecules/pubchem/pipeline.py molecules/pubchem/sdf.py molecules/pubchem/__init__.py molecules/trainer/ molecules/trainer/task.py molecules/trainer/__init__.py molecules/data-extractor.py molecules/preprocess.py molecules/predict.py
The molecules directory you downloaded mainly contain 4 scripts that encapsulate all phases of the workflow you will execute in this lab. It is summarized by the figure below:

In addition, it also contains these additional files and directories:
pubchem - subdirectory which contains common modules (i.e. pipeline.py and sdf.py) shared by the preprocessing and predicition phases. If you look at preprocess.py and predict.py, you can see the line import as pubchem at the top.
requirements.txt - contains packages to install in this Colab. These are Apache Beam and Tensorflow Transform. These allows you to create Extract-Transform-Load (ETL) pipelines and preprocess data. Let's install them in the next cell.
# Install required packages
!pip install -r ./molecules/requirements.txt
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting apache-beam
Downloading apache_beam-2.41.0-cp37-cp37m-manylinux2010_x86_64.whl (10.9 MB)
|████████████████████████████████| 10.9 MB 5.7 MB/s
Collecting tensorflow-transform
Downloading tensorflow_transform-1.10.1-py3-none-any.whl (439 kB)
|████████████████████████████████| 439 kB 19.8 MB/s
Requirement already satisfied: protobuf<4,>=3.12.2 in /usr/local/lib/python3.7/dist-packages (from apache-beam->-r ./molecules/requirements.txt (line 1)) (3.17.3)
Requirement already satisfied: httplib2<0.21.0,>=0.8 in /usr/local/lib/python3.7/dist-packages (from apache-beam->-r ./molecules/requirements.txt (line 1)) (0.17.4)
Collecting proto-plus<2,>=1.7.1
Downloading proto_plus-1.22.1-py3-none-any.whl (47 kB)
|████████████████████████████████| 47 kB 3.3 MB/s
Requirement already satisfied: pydot<2,>=1.2.0 in /usr/local/lib/python3.7/dist-packages (from apache-beam->-r ./molecules/requirements.txt (line 1)) (1.3.0)
Requirement already satisfied: python-dateutil<3,>=2.8.0 in /usr/local/lib/python3.7/dist-packages (from apache-beam->-r ./molecules/requirements.txt (line 1)) (2.8.2)
Requirement already satisfied: crcmod<2.0,>=1.7 in /usr/local/lib/python3.7/dist-packages (from apache-beam->-r ./molecules/requirements.txt (line 1)) (1.7)
Requirement already satisfied: typing-extensions>=3.7.0 in /usr/local/lib/python3.7/dist-packages (from apache-beam->-r ./molecules/requirements.txt (line 1)) (4.1.1)
Requirement already satisfied: pyarrow<8.0.0,>=0.15.1 in /usr/local/lib/python3.7/dist-packages (from apache-beam->-r ./molecules/requirements.txt (line 1)) (6.0.1)
Collecting orjson<4.0
Downloading orjson-3.8.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (270 kB)
|████████████████████████████████| 270 kB 21.7 MB/s
Collecting dill<0.3.2,>=0.3.1.1
Downloading dill-0.3.1.1.tar.gz (151 kB)
|████████████████████████████████| 151 kB 44.6 MB/s
Requirement already satisfied: grpcio<2,>=1.33.1 in /usr/local/lib/python3.7/dist-packages (from apache-beam->-r ./molecules/requirements.txt (line 1)) (1.47.0)
Requirement already satisfied: numpy<1.23.0,>=1.14.3 in /usr/local/lib/python3.7/dist-packages (from apache-beam->-r ./molecules/requirements.txt (line 1)) (1.21.6)
Collecting pymongo<4.0.0,>=3.8.0
Downloading pymongo-3.12.3-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (508 kB)
|████████████████████████████████| 508 kB 49.0 MB/s
Collecting requests<3.0.0,>=2.24.0
Downloading requests-2.28.1-py3-none-any.whl (62 kB)
|████████████████████████████████| 62 kB 1.5 MB/s
Collecting cloudpickle<3,>=2.1.0
Downloading cloudpickle-2.1.0-py3-none-any.whl (25 kB)
Requirement already satisfied: pytz>=2018.3 in /usr/local/lib/python3.7/dist-packages (from apache-beam->-r ./molecules/requirements.txt (line 1)) (2022.2.1)
Collecting hdfs<3.0.0,>=2.1.0
Downloading hdfs-2.7.0-py3-none-any.whl (34 kB)
Collecting fastavro<2,>=0.23.6
Downloading fastavro-1.6.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.4 MB)
|████████████████████████████████| 2.4 MB 51.4 MB/s
Requirement already satisfied: six>=1.5.2 in /usr/local/lib/python3.7/dist-packages (from grpcio<2,>=1.33.1->apache-beam->-r ./molecules/requirements.txt (line 1)) (1.15.0)
Collecting docopt
Downloading docopt-0.6.2.tar.gz (25 kB)
Collecting protobuf<4,>=3.12.2
Downloading protobuf-3.20.1-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.whl (1.0 MB)
|████████████████████████████████| 1.0 MB 39.2 MB/s
Requirement already satisfied: pyparsing>=2.1.4 in /usr/local/lib/python3.7/dist-packages (from pydot<2,>=1.2.0->apache-beam->-r ./molecules/requirements.txt (line 1)) (3.0.9)
Requirement already satisfied: certifi>=2017.4.17 in /usr/local/lib/python3.7/dist-packages (from requests<3.0.0,>=2.24.0->apache-beam->-r ./molecules/requirements.txt (line 1)) (2022.6.15)
Requirement already satisfied: charset-normalizer<3,>=2 in /usr/local/lib/python3.7/dist-packages (from requests<3.0.0,>=2.24.0->apache-beam->-r ./molecules/requirements.txt (line 1)) (2.1.1)
Requirement already satisfied: idna<4,>=2.5 in /usr/local/lib/python3.7/dist-packages (from requests<3.0.0,>=2.24.0->apache-beam->-r ./molecules/requirements.txt (line 1)) (2.10)
Requirement already satisfied: urllib3<1.27,>=1.21.1 in /usr/local/lib/python3.7/dist-packages (from requests<3.0.0,>=2.24.0->apache-beam->-r ./molecules/requirements.txt (line 1)) (1.24.3)
Requirement already satisfied: absl-py<2.0.0,>=0.9 in /usr/local/lib/python3.7/dist-packages (from tensorflow-transform->-r ./molecules/requirements.txt (line 2)) (1.2.0)
Collecting tensorflow!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,<2.10,>=1.15.5
Downloading tensorflow-2.9.2-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (511.8 MB)
|████████████████████████████████| 511.8 MB 8.2 kB/s
Collecting tfx-bsl<1.11.0,>=1.10.1
Downloading tfx_bsl-1.10.1-cp37-cp37m-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (21.6 MB)
|████████████████████████████████| 21.6 MB 1.3 MB/s
Requirement already satisfied: tensorflow-metadata<1.11.0,>=1.10.0 in /usr/local/lib/python3.7/dist-packages (from tensorflow-transform->-r ./molecules/requirements.txt (line 2)) (1.10.0)
Collecting google-cloud-spanner<2,>=1.13.0
Downloading google_cloud_spanner-1.19.3-py2.py3-none-any.whl (255 kB)
|████████████████████████████████| 255 kB 68.5 MB/s
Collecting google-cloud-videointelligence<2,>=1.8.0
Downloading google_cloud_videointelligence-1.16.3-py2.py3-none-any.whl (183 kB)
|████████████████████████████████| 183 kB 69.8 MB/s
Collecting google-cloud-dlp<4,>=3.0.0
Downloading google_cloud_dlp-3.8.1-py2.py3-none-any.whl (119 kB)
|████████████████████████████████| 119 kB 69.8 MB/s
Collecting google-auth-httplib2<0.2.0,>=0.1.0
Downloading google_auth_httplib2-0.1.0-py2.py3-none-any.whl (9.3 kB)
Requirement already satisfied: google-cloud-bigquery<3,>=1.6.0 in /usr/local/lib/python3.7/dist-packages (from apache-beam->-r ./molecules/requirements.txt (line 1)) (1.21.0)
Collecting google-cloud-vision<2,>=0.38.0
Downloading google_cloud_vision-1.0.2-py2.py3-none-any.whl (435 kB)
|████████████████████████████████| 435 kB 54.0 MB/s
Collecting google-cloud-recommendations-ai<0.8.0,>=0.1.0
Downloading google_cloud_recommendations_ai-0.7.1-py2.py3-none-any.whl (148 kB)
|████████████████████████████████| 148 kB 57.3 MB/s
Collecting google-cloud-bigtable<2,>=0.31.1
Downloading google_cloud_bigtable-1.7.2-py2.py3-none-any.whl (267 kB)
|████████████████████████████████| 267 kB 56.0 MB/s
Collecting google-cloud-language<2,>=1.3.0
Downloading google_cloud_language-1.3.2-py2.py3-none-any.whl (83 kB)
|████████████████████████████████| 83 kB 2.1 MB/s
Collecting google-apitools<0.5.32,>=0.5.31
Downloading google-apitools-0.5.31.tar.gz (173 kB)
|████████████████████████████████| 173 kB 77.0 MB/s
Requirement already satisfied: google-api-core!=2.8.2,<3 in /usr/local/lib/python3.7/dist-packages (from apache-beam->-r ./molecules/requirements.txt (line 1)) (1.31.6)
Requirement already satisfied: google-auth<3,>=1.18.0 in /usr/local/lib/python3.7/dist-packages (from apache-beam->-r ./molecules/requirements.txt (line 1)) (1.35.0)
Collecting google-cloud-pubsub<3,>=2.1.0
Downloading google_cloud_pubsub-2.13.6-py2.py3-none-any.whl (235 kB)
|████████████████████████████████| 235 kB 54.0 MB/s
Collecting google-cloud-pubsublite<2,>=1.2.0
Downloading google_cloud_pubsublite-1.4.3-py2.py3-none-any.whl (267 kB)
|████████████████████████████████| 267 kB 76.7 MB/s
Requirement already satisfied: google-cloud-datastore<2,>=1.8.0 in /usr/local/lib/python3.7/dist-packages (from apache-beam->-r ./molecules/requirements.txt (line 1)) (1.8.0)
Collecting google-cloud-bigquery-storage<2.14,>=2.6.3
Downloading google_cloud_bigquery_storage-2.13.2-py2.py3-none-any.whl (180 kB)
|████████████████████████████████| 180 kB 65.2 MB/s
Requirement already satisfied: google-cloud-core<3,>=0.28.1 in /usr/local/lib/python3.7/dist-packages (from apache-beam->-r ./molecules/requirements.txt (line 1)) (1.0.3)
Collecting grpcio-gcp<1,>=0.2.2
Downloading grpcio_gcp-0.2.2-py2.py3-none-any.whl (9.4 kB)
Requirement already satisfied: cachetools<5,>=3.1.0 in /usr/local/lib/python3.7/dist-packages (from apache-beam->-r ./molecules/requirements.txt (line 1)) (4.2.4)
Requirement already satisfied: packaging>=14.3 in /usr/local/lib/python3.7/dist-packages (from google-api-core!=2.8.2,<3->apache-beam->-r ./molecules/requirements.txt (line 1)) (21.3)
Requirement already satisfied: setuptools>=40.3.0 in /usr/local/lib/python3.7/dist-packages (from google-api-core!=2.8.2,<3->apache-beam->-r ./molecules/requirements.txt (line 1)) (57.4.0)
Requirement already satisfied: googleapis-common-protos<2.0dev,>=1.6.0 in /usr/local/lib/python3.7/dist-packages (from google-api-core!=2.8.2,<3->apache-beam->-r ./molecules/requirements.txt (line 1)) (1.56.4)
Collecting fasteners>=0.14
Downloading fasteners-0.17.3-py3-none-any.whl (18 kB)
Requirement already satisfied: oauth2client>=1.4.12 in /usr/local/lib/python3.7/dist-packages (from google-apitools<0.5.32,>=0.5.31->apache-beam->-r ./molecules/requirements.txt (line 1)) (4.1.3)
Requirement already satisfied: rsa<5,>=3.1.4 in /usr/local/lib/python3.7/dist-packages (from google-auth<3,>=1.18.0->apache-beam->-r ./molecules/requirements.txt (line 1)) (4.9)
Requirement already satisfied: pyasn1-modules>=0.2.1 in /usr/local/lib/python3.7/dist-packages (from google-auth<3,>=1.18.0->apache-beam->-r ./molecules/requirements.txt (line 1)) (0.2.8)
Requirement already satisfied: google-resumable-media!=0.4.0,<0.5.0dev,>=0.3.1 in /usr/local/lib/python3.7/dist-packages (from google-cloud-bigquery<3,>=1.6.0->apache-beam->-r ./molecules/requirements.txt (line 1)) (0.4.1)
Collecting grpc-google-iam-v1<0.13dev,>=0.12.3
Downloading grpc_google_iam_v1-0.12.4-py2.py3-none-any.whl (26 kB)
Collecting google-cloud-core<3,>=0.28.1
Downloading google_cloud_core-1.7.3-py2.py3-none-any.whl (28 kB)
Collecting google-cloud-dlp<4,>=3.0.0
Downloading google_cloud_dlp-3.8.0-py2.py3-none-any.whl (119 kB)
|████████████████████████████████| 119 kB 74.7 MB/s
Downloading google_cloud_dlp-3.7.1-py2.py3-none-any.whl (118 kB)
|████████████████████████████████| 118 kB 76.7 MB/s
Collecting grpcio-status>=1.16.0
Downloading grpcio_status-1.48.1-py3-none-any.whl (14 kB)
Collecting google-cloud-pubsub<3,>=2.1.0
Downloading google_cloud_pubsub-2.13.5-py2.py3-none-any.whl (234 kB)
|████████████████████████████████| 234 kB 53.4 MB/s
Downloading google_cloud_pubsub-2.13.4-py2.py3-none-any.whl (234 kB)
|████████████████████████████████| 234 kB 59.1 MB/s
Downloading google_cloud_pubsub-2.13.3-py2.py3-none-any.whl (234 kB)
|████████████████████████████████| 234 kB 57.9 MB/s
Downloading google_cloud_pubsub-2.13.2-py2.py3-none-any.whl (234 kB)
|████████████████████████████████| 234 kB 74.9 MB/s
Downloading google_cloud_pubsub-2.13.1-py2.py3-none-any.whl (234 kB)
|████████████████████████████████| 234 kB 66.3 MB/s
Collecting overrides<7.0.0,>=6.0.1
Downloading overrides-6.2.0-py3-none-any.whl (17 kB)
Collecting google-cloud-pubsublite<2,>=1.2.0
Downloading google_cloud_pubsublite-1.4.2-py2.py3-none-any.whl (265 kB)
|████████████████████████████████| 265 kB 69.1 MB/s
Collecting google-cloud-recommendations-ai<0.8.0,>=0.1.0
Downloading google_cloud_recommendations_ai-0.7.0-py2.py3-none-any.whl (148 kB)
|████████████████████████████████| 148 kB 52.3 MB/s
Downloading google_cloud_recommendations_ai-0.6.2-py2.py3-none-any.whl (147 kB)
|████████████████████████████████| 147 kB 65.8 MB/s
Collecting grpcio<2,>=1.33.1
Downloading grpcio-1.48.1-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (4.6 MB)
|████████████████████████████████| 4.6 MB 43.4 MB/s
Requirement already satisfied: pyasn1>=0.1.7 in /usr/local/lib/python3.7/dist-packages (from oauth2client>=1.4.12->google-apitools<0.5.32,>=0.5.31->apache-beam->-r ./molecules/requirements.txt (line 1)) (0.4.8)
Requirement already satisfied: termcolor>=1.1.0 in /usr/local/lib/python3.7/dist-packages (from tensorflow!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,<2.10,>=1.15.5->tensorflow-transform->-r ./molecules/requirements.txt (line 2)) (1.1.0)
Requirement already satisfied: tensorflow-io-gcs-filesystem>=0.23.1 in /usr/local/lib/python3.7/dist-packages (from tensorflow!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,<2.10,>=1.15.5->tensorflow-transform->-r ./molecules/requirements.txt (line 2)) (0.26.0)
Requirement already satisfied: libclang>=13.0.0 in /usr/local/lib/python3.7/dist-packages (from tensorflow!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,<2.10,>=1.15.5->tensorflow-transform->-r ./molecules/requirements.txt (line 2)) (14.0.6)
Collecting tensorboard<2.10,>=2.9
Downloading tensorboard-2.9.1-py3-none-any.whl (5.8 MB)
|████████████████████████████████| 5.8 MB 52.1 MB/s
Collecting protobuf<4,>=3.12.2
Downloading protobuf-3.19.4-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.1 MB)
|████████████████████████████████| 1.1 MB 59.3 MB/s
Requirement already satisfied: opt-einsum>=2.3.2 in /usr/local/lib/python3.7/dist-packages (from tensorflow!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,<2.10,>=1.15.5->tensorflow-transform->-r ./molecules/requirements.txt (line 2)) (3.3.0)
Collecting keras<2.10.0,>=2.9.0rc0
Downloading keras-2.9.0-py2.py3-none-any.whl (1.6 MB)
|████████████████████████████████| 1.6 MB 40.3 MB/s
Requirement already satisfied: wrapt>=1.11.0 in /usr/local/lib/python3.7/dist-packages (from tensorflow!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,<2.10,>=1.15.5->tensorflow-transform->-r ./molecules/requirements.txt (line 2)) (1.14.1)
Requirement already satisfied: keras-preprocessing>=1.1.1 in /usr/local/lib/python3.7/dist-packages (from tensorflow!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,<2.10,>=1.15.5->tensorflow-transform->-r ./molecules/requirements.txt (line 2)) (1.1.2)
Requirement already satisfied: h5py>=2.9.0 in /usr/local/lib/python3.7/dist-packages (from tensorflow!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,<2.10,>=1.15.5->tensorflow-transform->-r ./molecules/requirements.txt (line 2)) (3.1.0)
Collecting tensorflow-estimator<2.10.0,>=2.9.0rc0
Downloading tensorflow_estimator-2.9.0-py2.py3-none-any.whl (438 kB)
|████████████████████████████████| 438 kB 56.9 MB/s
Requirement already satisfied: google-pasta>=0.1.1 in /usr/local/lib/python3.7/dist-packages (from tensorflow!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,<2.10,>=1.15.5->tensorflow-transform->-r ./molecules/requirements.txt (line 2)) (0.2.0)
Collecting flatbuffers<2,>=1.12
Downloading flatbuffers-1.12-py2.py3-none-any.whl (15 kB)
Collecting gast<=0.4.0,>=0.2.1
Downloading gast-0.4.0-py3-none-any.whl (9.8 kB)
Requirement already satisfied: astunparse>=1.6.0 in /usr/local/lib/python3.7/dist-packages (from tensorflow!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,<2.10,>=1.15.5->tensorflow-transform->-r ./molecules/requirements.txt (line 2)) (1.6.3)
Requirement already satisfied: wheel<1.0,>=0.23.0 in /usr/local/lib/python3.7/dist-packages (from astunparse>=1.6.0->tensorflow!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,<2.10,>=1.15.5->tensorflow-transform->-r ./molecules/requirements.txt (line 2)) (0.37.1)
Requirement already satisfied: cached-property in /usr/local/lib/python3.7/dist-packages (from h5py>=2.9.0->tensorflow!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,<2.10,>=1.15.5->tensorflow-transform->-r ./molecules/requirements.txt (line 2)) (1.5.2)
Requirement already satisfied: tensorboard-data-server<0.7.0,>=0.6.0 in /usr/local/lib/python3.7/dist-packages (from tensorboard<2.10,>=2.9->tensorflow!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,<2.10,>=1.15.5->tensorflow-transform->-r ./molecules/requirements.txt (line 2)) (0.6.1)
Requirement already satisfied: markdown>=2.6.8 in /usr/local/lib/python3.7/dist-packages (from tensorboard<2.10,>=2.9->tensorflow!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,<2.10,>=1.15.5->tensorflow-transform->-r ./molecules/requirements.txt (line 2)) (3.4.1)
Requirement already satisfied: tensorboard-plugin-wit>=1.6.0 in /usr/local/lib/python3.7/dist-packages (from tensorboard<2.10,>=2.9->tensorflow!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,<2.10,>=1.15.5->tensorflow-transform->-r ./molecules/requirements.txt (line 2)) (1.8.1)
Requirement already satisfied: google-auth-oauthlib<0.5,>=0.4.1 in /usr/local/lib/python3.7/dist-packages (from tensorboard<2.10,>=2.9->tensorflow!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,<2.10,>=1.15.5->tensorflow-transform->-r ./molecules/requirements.txt (line 2)) (0.4.6)
Requirement already satisfied: werkzeug>=1.0.1 in /usr/local/lib/python3.7/dist-packages (from tensorboard<2.10,>=2.9->tensorflow!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,<2.10,>=1.15.5->tensorflow-transform->-r ./molecules/requirements.txt (line 2)) (1.0.1)
Requirement already satisfied: requests-oauthlib>=0.7.0 in /usr/local/lib/python3.7/dist-packages (from google-auth-oauthlib<0.5,>=0.4.1->tensorboard<2.10,>=2.9->tensorflow!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,<2.10,>=1.15.5->tensorflow-transform->-r ./molecules/requirements.txt (line 2)) (1.3.1)
Requirement already satisfied: importlib-metadata>=4.4 in /usr/local/lib/python3.7/dist-packages (from markdown>=2.6.8->tensorboard<2.10,>=2.9->tensorflow!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,<2.10,>=1.15.5->tensorflow-transform->-r ./molecules/requirements.txt (line 2)) (4.12.0)
Requirement already satisfied: zipp>=0.5 in /usr/local/lib/python3.7/dist-packages (from importlib-metadata>=4.4->markdown>=2.6.8->tensorboard<2.10,>=2.9->tensorflow!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,<2.10,>=1.15.5->tensorflow-transform->-r ./molecules/requirements.txt (line 2)) (3.8.1)
Requirement already satisfied: oauthlib>=3.0.0 in /usr/local/lib/python3.7/dist-packages (from requests-oauthlib>=0.7.0->google-auth-oauthlib<0.5,>=0.4.1->tensorboard<2.10,>=2.9->tensorflow!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,<2.10,>=1.15.5->tensorflow-transform->-r ./molecules/requirements.txt (line 2)) (3.2.0)
Requirement already satisfied: pandas<2,>=1.0 in /usr/local/lib/python3.7/dist-packages (from tfx-bsl<1.11.0,>=1.10.1->tensorflow-transform->-r ./molecules/requirements.txt (line 2)) (1.3.5)
Collecting tensorflow-serving-api!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,<3,>=1.15
Downloading tensorflow_serving_api-2.9.1-py2.py3-none-any.whl (37 kB)
Requirement already satisfied: google-api-python-client<2,>=1.7.11 in /usr/local/lib/python3.7/dist-packages (from tfx-bsl<1.11.0,>=1.10.1->tensorflow-transform->-r ./molecules/requirements.txt (line 2)) (1.12.11)
Requirement already satisfied: uritemplate<4dev,>=3.0.0 in /usr/local/lib/python3.7/dist-packages (from google-api-python-client<2,>=1.7.11->tfx-bsl<1.11.0,>=1.10.1->tensorflow-transform->-r ./molecules/requirements.txt (line 2)) (3.0.1)
Building wheels for collected packages: dill, google-apitools, docopt
Building wheel for dill (setup.py) ... done
Created wheel for dill: filename=dill-0.3.1.1-py3-none-any.whl size=78544 sha256=478443e8e20137fe9bb2bbf0e118b0f10eccbaf61c0bb5011446fece7d165eec
Stored in directory: /root/.cache/pip/wheels/a4/61/fd/c57e374e580aa78a45ed78d5859b3a44436af17e22ca53284f
Building wheel for google-apitools (setup.py) ... done
Created wheel for google-apitools: filename=google_apitools-0.5.31-py3-none-any.whl size=131039 sha256=a5a28c4d3fdee688d7c0923227667dd3f10fdd21c896ba4fd2205bad3408e55e
Stored in directory: /root/.cache/pip/wheels/19/b5/2f/1cc3cf2b31e7a9cd1508731212526d9550271274d351c96f16
Building wheel for docopt (setup.py) ... done
Created wheel for docopt: filename=docopt-0.6.2-py2.py3-none-any.whl size=13723 sha256=5c6c07ea1ac1d493074052b20d9f3fd910574cb6a6400fe00227b5324bf324fa
Stored in directory: /root/.cache/pip/wheels/72/b0/3f/1d95f96ff986c7dfffe46ce2be4062f38ebd04b506c77c81b9
Successfully built dill google-apitools docopt
Installing collected packages: protobuf, requests, grpcio, proto-plus, grpcio-status, grpcio-gcp, grpc-google-iam-v1, docopt, tensorflow-estimator, tensorboard, pymongo, overrides, orjson, keras, hdfs, google-cloud-pubsub, google-cloud-core, gast, flatbuffers, fasteners, fastavro, dill, cloudpickle, tensorflow, google-cloud-vision, google-cloud-videointelligence, google-cloud-spanner, google-cloud-recommendations-ai, google-cloud-pubsublite, google-cloud-language, google-cloud-dlp, google-cloud-bigtable, google-cloud-bigquery-storage, google-auth-httplib2, google-apitools, apache-beam, tensorflow-serving-api, tfx-bsl, tensorflow-transform
Attempting uninstall: protobuf
Found existing installation: protobuf 3.17.3
Uninstalling protobuf-3.17.3:
Successfully uninstalled protobuf-3.17.3
Attempting uninstall: requests
Found existing installation: requests 2.23.0
Uninstalling requests-2.23.0:
Successfully uninstalled requests-2.23.0
Attempting uninstall: grpcio
Found existing installation: grpcio 1.47.0
Uninstalling grpcio-1.47.0:
Successfully uninstalled grpcio-1.47.0
Attempting uninstall: tensorflow-estimator
Found existing installation: tensorflow-estimator 2.8.0
Uninstalling tensorflow-estimator-2.8.0:
Successfully uninstalled tensorflow-estimator-2.8.0
Attempting uninstall: tensorboard
Found existing installation: tensorboard 2.8.0
Uninstalling tensorboard-2.8.0:
Successfully uninstalled tensorboard-2.8.0
Attempting uninstall: pymongo
Found existing installation: pymongo 4.2.0
Uninstalling pymongo-4.2.0:
Successfully uninstalled pymongo-4.2.0
Attempting uninstall: keras
Found existing installation: keras 2.8.0
Uninstalling keras-2.8.0:
Successfully uninstalled keras-2.8.0
Attempting uninstall: google-cloud-core
Found existing installation: google-cloud-core 1.0.3
Uninstalling google-cloud-core-1.0.3:
Successfully uninstalled google-cloud-core-1.0.3
Attempting uninstall: gast
Found existing installation: gast 0.5.3
Uninstalling gast-0.5.3:
Successfully uninstalled gast-0.5.3
Attempting uninstall: flatbuffers
Found existing installation: flatbuffers 2.0.7
Uninstalling flatbuffers-2.0.7:
Successfully uninstalled flatbuffers-2.0.7
Attempting uninstall: dill
Found existing installation: dill 0.3.5.1
Uninstalling dill-0.3.5.1:
Successfully uninstalled dill-0.3.5.1
Attempting uninstall: cloudpickle
Found existing installation: cloudpickle 1.5.0
Uninstalling cloudpickle-1.5.0:
Successfully uninstalled cloudpickle-1.5.0
Attempting uninstall: tensorflow
Found existing installation: tensorflow 2.8.2+zzzcolab20220719082949
Uninstalling tensorflow-2.8.2+zzzcolab20220719082949:
Successfully uninstalled tensorflow-2.8.2+zzzcolab20220719082949
Attempting uninstall: google-cloud-language
Found existing installation: google-cloud-language 1.2.0
Uninstalling google-cloud-language-1.2.0:
Successfully uninstalled google-cloud-language-1.2.0
Attempting uninstall: google-cloud-bigquery-storage
Found existing installation: google-cloud-bigquery-storage 1.1.2
Uninstalling google-cloud-bigquery-storage-1.1.2:
Successfully uninstalled google-cloud-bigquery-storage-1.1.2
Attempting uninstall: google-auth-httplib2
Found existing installation: google-auth-httplib2 0.0.4
Uninstalling google-auth-httplib2-0.0.4:
Successfully uninstalled google-auth-httplib2-0.0.4
Successfully installed apache-beam-2.41.0 cloudpickle-2.1.0 dill-0.3.1.1 docopt-0.6.2 fastavro-1.6.0 fasteners-0.17.3 flatbuffers-1.12 gast-0.4.0 google-apitools-0.5.31 google-auth-httplib2-0.1.0 google-cloud-bigquery-storage-2.13.2 google-cloud-bigtable-1.7.2 google-cloud-core-1.7.3 google-cloud-dlp-3.7.1 google-cloud-language-1.3.2 google-cloud-pubsub-2.13.1 google-cloud-pubsublite-1.4.2 google-cloud-recommendations-ai-0.6.2 google-cloud-spanner-1.19.3 google-cloud-videointelligence-1.16.3 google-cloud-vision-1.0.2 grpc-google-iam-v1-0.12.4 grpcio-1.48.1 grpcio-gcp-0.2.2 grpcio-status-1.48.1 hdfs-2.7.0 keras-2.9.0 orjson-3.8.0 overrides-6.2.0 proto-plus-1.22.1 protobuf-3.19.4 pymongo-3.12.3 requests-2.28.1 tensorboard-2.9.1 tensorflow-2.9.2 tensorflow-estimator-2.9.0 tensorflow-serving-api-2.9.1 tensorflow-transform-1.10.1 tfx-bsl-1.10.1
Note: In Google Colab, you need to restart the runtime at this point to finalize updating the packages you just installed. You can do so by clicking the Restart Runtime button at the end of the output cell above (after installation), or by selecting Runtime > Restart Runtime in the Menu bar. Please do not proceed to the next section without restarting.
Next, you'll define the working directory to contain all the results you will generate in this exercise. After each phase, you can open the Colab file explorer on the left and look under the results directory to see the new files and directories generated.
# Define working directory
WORK_DIR = "results"
With that, you are now ready to execute the pipeline. As shown in the figure earlier, the logic is already implemented in the four main scripts. You will run them one by one and the next sections will discuss relevant detail and the outputs generated.
The first step is to extract the input data. The dataset is stored as SDF files and is extracted from the National Center for Biotechnology Information (FTP source). Chapter 6 of this document shows a more detailed description of the SDF file format.
The data-extractor.py file extracts and decompresses the specified SDF files. In later steps, the example preprocesses these files and uses the data to train and evaluate the machine learning model. The file extracts the SDF files from the public source and stores them in a subdirectory inside the specified working directory.
As you can see here, the complete set of files is huge and can easily exceed storage limits in Colab. For this exercise, you will just download one file. You can use the script as shown in the cells below:
# Print the help documentation. You can ignore references to GCP because you will be running everything in Colab.
!python ./molecules/data-extractor.py --help
usage: data-extractor.py [-h] --work-dir WORK_DIR
[--data-sources DATA_SOURCES [DATA_SOURCES ...]]
[--filter-regex FILTER_REGEX] --max-data-files
MAX_DATA_FILES
optional arguments:
-h, --help show this help message and exit
--work-dir WORK_DIR Directory for staging and working files. This can be a
Google Cloud Storage path. (default: None)
--data-sources DATA_SOURCES [DATA_SOURCES ...]
Data source location where SDF file(s) are stored.
Paths can be local, ftp://<path>, or gcs://<path>.
Examples: ftp://hostname/path
ftp://username:password@hostname/path (default: ['ftp:
//anonymous:guest@ftp.ncbi.nlm.nih.gov/pubchem/Compoun
d_3D/01_conf_per_cmpd/SDF'])
--filter-regex FILTER_REGEX
Regular expression to filter which files to use. The
regular expression will be searched on the full
absolute path. Every match will be kept. (default:
\.sdf)
--max-data-files MAX_DATA_FILES
Maximum number of data files for every file pattern
expansion. Set to -1 to use all files. (default: None)
# Run the data extractor
!python ./molecules/data-extractor.py --max-data-files 1 --work-dir={WORK_DIR}
Found 6347 files, using 1 Extracting data files... Extracted results/data/00000001_00025000.sdf
You should now have a new folder in your work directory called data. This will contain the SDF file you downloaded.
# List working directory
!ls {WORK_DIR}
data
In the SDF Documentation linked earlier, it shows that one record is terminated by $$$$. You can use the command below to print the first one in the file. As you'll see, just one record is already pretty long. In the next phase, you'll feed these records in a pipeline that will transform these into a form that can be consumed by our model.
# Print one record
!sed '/$$$$/q' {WORK_DIR}/data/00000001_00025000.sdf
1
-OEChem-05192200403D
31 30 0 1 0 0 0 0 0999 V2000
0.3387 0.9262 0.4600 O 0 0 0 0 0 0 0 0 0 0 0 0
3.4786 -1.7069 -0.3119 O 0 5 0 0 0 0 0 0 0 0 0 0
1.8428 -1.4073 1.2523 O 0 0 0 0 0 0 0 0 0 0 0 0
0.4166 2.5213 -1.2091 O 0 0 0 0 0 0 0 0 0 0 0 0
-2.2359 -0.7251 0.0270 N 0 3 0 0 0 0 0 0 0 0 0 0
-0.7783 -1.1579 0.0914 C 0 0 0 0 0 0 0 0 0 0 0 0
0.1368 -0.0961 -0.5161 C 0 0 2 0 0 0 0 0 0 0 0 0
-3.1119 -1.7972 0.6590 C 0 0 0 0 0 0 0 0 0 0 0 0
-2.4103 0.5837 0.7840 C 0 0 0 0 0 0 0 0 0 0 0 0
-2.6433 -0.5289 -1.4260 C 0 0 0 0 0 0 0 0 0 0 0 0
1.4879 -0.6438 -0.9795 C 0 0 0 0 0 0 0 0 0 0 0 0
2.3478 -1.3163 0.1002 C 0 0 0 0 0 0 0 0 0 0 0 0
0.4627 2.1935 -0.0312 C 0 0 0 0 0 0 0 0 0 0 0 0
0.6678 3.1549 1.1001 C 0 0 0 0 0 0 0 0 0 0 0 0
-0.7073 -2.1051 -0.4563 H 0 0 0 0 0 0 0 0 0 0 0 0
-0.5669 -1.3392 1.1503 H 0 0 0 0 0 0 0 0 0 0 0 0
-0.3089 0.3239 -1.4193 H 0 0 0 0 0 0 0 0 0 0 0 0
-2.9705 -2.7295 0.1044 H 0 0 0 0 0 0 0 0 0 0 0 0
-2.8083 -1.9210 1.7028 H 0 0 0 0 0 0 0 0 0 0 0 0
-4.1563 -1.4762 0.6031 H 0 0 0 0 0 0 0 0 0 0 0 0
-2.0398 1.4170 0.1863 H 0 0 0 0 0 0 0 0 0 0 0 0
-3.4837 0.7378 0.9384 H 0 0 0 0 0 0 0 0 0 0 0 0
-1.9129 0.5071 1.7551 H 0 0 0 0 0 0 0 0 0 0 0 0
-2.2450 0.4089 -1.8190 H 0 0 0 0 0 0 0 0 0 0 0 0
-2.3000 -1.3879 -2.0100 H 0 0 0 0 0 0 0 0 0 0 0 0
-3.7365 -0.4723 -1.4630 H 0 0 0 0 0 0 0 0 0 0 0 0
1.3299 -1.3744 -1.7823 H 0 0 0 0 0 0 0 0 0 0 0 0
2.0900 0.1756 -1.3923 H 0 0 0 0 0 0 0 0 0 0 0 0
-0.1953 3.1280 1.7699 H 0 0 0 0 0 0 0 0 0 0 0 0
0.7681 4.1684 0.7012 H 0 0 0 0 0 0 0 0 0 0 0 0
1.5832 2.9010 1.6404 H 0 0 0 0 0 0 0 0 0 0 0 0
1 7 1 0 0 0 0
1 13 1 0 0 0 0
2 12 1 0 0 0 0
3 12 2 0 0 0 0
4 13 2 0 0 0 0
5 6 1 0 0 0 0
5 8 1 0 0 0 0
5 9 1 0 0 0 0
5 10 1 0 0 0 0
6 7 1 0 0 0 0
6 15 1 0 0 0 0
6 16 1 0 0 0 0
7 11 1 0 0 0 0
7 17 1 0 0 0 0
8 18 1 0 0 0 0
8 19 1 0 0 0 0
8 20 1 0 0 0 0
9 21 1 0 0 0 0
9 22 1 0 0 0 0
9 23 1 0 0 0 0
10 24 1 0 0 0 0
10 25 1 0 0 0 0
10 26 1 0 0 0 0
11 12 1 0 0 0 0
11 27 1 0 0 0 0
11 28 1 0 0 0 0
13 14 1 0 0 0 0
14 29 1 0 0 0 0
14 30 1 0 0 0 0
14 31 1 0 0 0 0
M CHG 2 2 -1 5 1
M END
> <PUBCHEM_COMPOUND_CID>
1
> <PUBCHEM_CONFORMER_RMSD>
0.6
> <PUBCHEM_CONFORMER_DIVERSEORDER>
2
43
65
46
25
35
57
19
53
42
34
37
41
50
30
14
13
10
56
28
55
22
17
44
52
48
21
7
61
16
66
36
12
32
40
1
24
29
63
47
9
39
60
5
20
31
62
51
4
59
67
8
18
11
33
26
6
27
64
15
58
54
23
38
3
45
49
> <PUBCHEM_MMFF94_PARTIAL_CHARGES>
14
1 -0.43
10 0.5
11 -0.11
12 0.91
13 0.66
14 0.06
2 -0.9
3 -0.9
4 -0.57
5 -1.01
6 0.5
7 0.28
8 0.5
9 0.5
> <PUBCHEM_EFFECTIVE_ROTOR_COUNT>
6
> <PUBCHEM_PHARMACOPHORE_FEATURES>
5
1 2 acceptor
1 3 acceptor
1 4 acceptor
1 5 cation
3 2 3 12 anion
> <PUBCHEM_HEAVY_ATOM_COUNT>
14
> <PUBCHEM_ATOM_DEF_STEREO_COUNT>
0
> <PUBCHEM_ATOM_UDEF_STEREO_COUNT>
1
> <PUBCHEM_BOND_DEF_STEREO_COUNT>
0
> <PUBCHEM_BOND_UDEF_STEREO_COUNT>
0
> <PUBCHEM_ISOTOPIC_ATOM_COUNT>
0
> <PUBCHEM_COMPONENT_COUNT>
1
> <PUBCHEM_CACTVS_TAUTO_COUNT>
1
> <PUBCHEM_CONFORMER_ID>
0000000100000002
> <PUBCHEM_MMFF94_ENERGY>
37.801
> <PUBCHEM_FEATURE_SELFOVERLAP>
25.427
> <PUBCHEM_SHAPE_FINGERPRINT>
1 1 17907859857256425260
13132413 78 18339935856441330356
16945 1 18127404777055172104
17841504 4 18338806718360982307
18410436 195 18412821378365737484
20361792 2 18413103948606886951
20645477 70 18193836175106948431
20653091 64 18337681930618404851
20711985 327 18273495675867710310
20711985 344 18052533275153547866
21041028 32 18342473533857807689
21061003 4 18410298003707379195
21524375 3 17335906067529293413
22112679 90 18128282041358100696
23419403 2 17977062926062270852
23552423 10 18193564595396549919
23557571 272 18127697028774774262
23598294 1 17832149325056171186
2748010 2 18339911658547624660
305870 269 17981602981145137625
31174 14 18192722361058170003
528862 383 18124596637411617035
7364860 26 18197783412505576099
81228 2 18051694343465326048
81539 233 17831573545929999781
> <PUBCHEM_SHAPE_MULTIPOLES>
259.66
4.28
3.04
1.21
1.75
2.55
0.16
-3.13
-0.22
-2.18
-0.56
0.21
0.17
0.09
> <PUBCHEM_SHAPE_SELFOVERLAP>
494.342
> <PUBCHEM_SHAPE_VOLUME>
160.7
> <PUBCHEM_COORDINATE_TYPE>
2
5
10
$$$$
The next script: preprocess.py uses an Apache Beam pipeline to preprocess the data. The pipeline performs the following preprocessing actions:
Apache Beam transforms can efficiently manipulate single elements at a time, but transforms that require a full pass of the dataset cannot easily be done with only Apache Beam and are better done using tf.Transform. Because of this, the code uses Apache Beam transforms to read and format the molecules, and to count the atoms in each molecule. The code then uses tf.Transform to find the global minimum and maximum counts in order to normalize the data.
The following image shows the steps in the pipeline.

You will run the script first and the following sections will discuss the relevant parts of this code. This will take around 6 minutes to run.
# Print help documentation
!python ./molecules/preprocess.py --help
usage: preprocess.py [-h] --work-dir WORK_DIR
optional arguments:
-h, --help show this help message and exit
--work-dir WORK_DIR Directory for staging and working files. This can be a
Google Cloud Storage path. (default: None)
# Run the preprocessing script
!python ./molecules/preprocess.py --work-dir={WORK_DIR}
2022-09-05 03:52:46.909003: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
You should now have a few more outputs in your work directory. Most important are:
tf.Transform outputs# List working directory
!ls {WORK_DIR}
data PreprocessData train-dataset transform_fn eval-dataset tft-temp transformed_metadata
The training and evaluation datasets contain TFRecords and you can view them by running the helper function in the cells below.
from google.protobuf.json_format import MessageToDict
# Define a helper function to get individual examples
def get_records(dataset, num_records):
'''Extracts records from the given dataset.
Args:
dataset (TFRecordDataset): dataset saved in the preprocessing step
num_records (int): number of records to preview
'''
# initialize an empty list
records = []
# Use the `take()` method to specify how many records to get
for tfrecord in dataset.take(num_records):
# Get the numpy property of the tensor
serialized_example = tfrecord.numpy()
# Initialize a `tf.train.Example()` to read the serialized data
example = tf.train.Example()
# Read the example data (output is a protocol buffer message)
example.ParseFromString(serialized_example)
# convert the protocol bufffer message to a Python dictionary
example_dict = (MessageToDict(example))
# append to the records list
records.append(example_dict)
return records
import tensorflow as tf
from pprint import pprint
# Create TF Dataset from TFRecord of training set
train_data = tf.data.TFRecordDataset(f'{WORK_DIR}/train-dataset/part-00000-of-00001')
# Print two records
test_data = get_records(train_data, 2)
pprint(test_data)
[{'features': {'feature': {'Energy': {'floatList': {'value': [44.1107]}},
'NormalizedC': {'floatList': {'value': [0.21428572]}},
'NormalizedH': {'floatList': {'value': [0.28125]}},
'NormalizedN': {'floatList': {'value': [0.083333336]}},
'NormalizedO': {'floatList': {'value': [0.1904762]}}}}},
{'features': {'feature': {'Energy': {'floatList': {'value': [19.4085]}},
'NormalizedC': {'floatList': {'value': [0.16666667]}},
'NormalizedH': {'floatList': {'value': [0.125]}},
'NormalizedN': {'floatList': {'value': [0.0]}},
'NormalizedO': {'floatList': {'value': [0.1904762]}}}}}]
Note: From the output cell above, you might concur that we'll need more than the atom counts to make better predictions. You'll notice that the counts are identical in both records but the Energy value is different. Thus, you cannot expect the model to have a low loss during the training phase later. For simplicity, we'll just use atom counts in this exercise but feel free to revise later to have more predictive features. You can share your findings in our Discourse community to discuss with other learners who are interested in the same problem.
The PreprocessData contains Python objects needed in the training phase such as:
These are saved in a serialized file using dill when you ran the preprocess script earlier and you can deserialize it using the cell below to view its contents.
import dill as pickle
# Helper function to load the serialized file
def load(filename):
with tf.io.gfile.GFile(filename, 'rb') as f:
return pickle.load(f)
# Load PreprocessData
preprocess_data = load('/content/results/PreprocessData')
# Print contents
pprint(vars(preprocess_data))
{'eval_files_pattern': 'results/eval-dataset/part*',
'input_feature_spec': {'Energy': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None),
'TotalC': FixedLenFeature(shape=[], dtype=tf.int64, default_value=None),
'TotalH': FixedLenFeature(shape=[], dtype=tf.int64, default_value=None),
'TotalN': FixedLenFeature(shape=[], dtype=tf.int64, default_value=None),
'TotalO': FixedLenFeature(shape=[], dtype=tf.int64, default_value=None)},
'labels': ['Energy'],
'train_files_pattern': 'results/train-dataset/part*'}
The next sections will describe how these are implemented as a Beam pipeline in preprocess.py. You can open this file in a separate text editor so you can look at it more closely while reviewing the snippets below.
The preprocess.py code creates an Apache Beam pipeline.
The Molecules code sample uses a Deep Neural Network Regressor to make predictions. The general recommendation is to normalize the inputs before feeding them into the ML model. The pipeline uses tf.Transform to normalize the counts of each atom to values between 0 and 1. To read more about normalizing inputs, see feature scaling.
Normalizing the values requires a full pass through the dataset, recording the minimum and maximum values. The code uses tf.Transform to go through the entire dataset and apply full-pass transforms.
To use tf.Transform, the code must provide a function that contains the logic of the transform to perform on the dataset. In preprocess.py, the code uses the AnalyzeAndTransformDataset transform provided by tf.Transform. Learn more about how to use tf.Transform.
# Apply the tf.Transform preprocessing_fn
input_metadata = dataset_metadata.DatasetMetadata(
dataset_schema.from_feature_spec(input_feature_spec))
dataset_and_metadata, transform_fn = (
(dataset, input_metadata)
| 'Feature scaling' >> beam_impl.AnalyzeAndTransformDataset(
feature_scaling))
dataset, metadata = dataset_and_metadata
</details>
In preprocess.py, the feature_scaling function used is normalize_inputs, which is defined in pubchem/pipeline.py. The function uses the tf.Transform function scale_to_0_1 to normalize the counts to values between 0 and 1.
def normalize_inputs(inputs):
"""Preprocessing function for tf.Transform (full-pass transformations).
Here we will do any preprocessing that requires a full-pass of the dataset.
It takes as inputs the preprocessed data from the `PTransform` we specify, in
this case `SimpleFeatureExtraction`.
Common operations might be scaling values to 0-1, getting the minimum or
maximum value of a certain field, creating a vocabulary for a string field.
There are two main types of transformations supported by tf.Transform, for
more information, check the following modules:
- analyzers: tensorflow_transform.analyzers.py
- mappers: tensorflow_transform.mappers.py
Any transformation done in tf.Transform will be embedded into the TensorFlow
model itself.
"""
return {
# Scale the input features for normalization
'NormalizedC': tft.scale_to_0_1(inputs['TotalC']),
'NormalizedH': tft.scale_to_0_1(inputs['TotalH']),
'NormalizedO': tft.scale_to_0_1(inputs['TotalO']),
'NormalizedN': tft.scale_to_0_1(inputs['TotalN']),
# Do not scale the label since we want the absolute number for prediction
'Energy': inputs['Energy'],
}
</details>
Next, the preprocess.py pipeline partitions the single dataset into two datasets. It allocates approximately 80% of the data to be used as training data, and approximately 20% of the data to be used as evaluation data.
# Split the dataset into a training set and an evaluation set
assert 0 < eval_percent < 100, 'eval_percent must in the range (0-100)'
train_dataset, eval_dataset = (
dataset
| 'Split dataset' >> beam.Partition(
lambda elem, _: int(random.uniform(0, 100) < eval_percent), 2))
</details>
Finally, the preprocess.py pipeline writes the two datasets (training and evaluation) using the WriteToTFRecord transform.
# Write the datasets as TFRecords
coder = example_proto_coder.ExampleProtoCoder(metadata.schema)
train_dataset_prefix = os.path.join(train_dataset_dir, 'part')
_ = (
train_dataset
| 'Write train dataset' >> tfrecordio.WriteToTFRecord(
train_dataset_prefix, coder))
eval_dataset_prefix = os.path.join(eval_dataset_dir, 'part')
_ = (
eval_dataset
| 'Write eval dataset' >> tfrecordio.WriteToTFRecord(
eval_dataset_prefix, coder))
# Write the transform_fn
_ = (
transform_fn
| 'Write transformFn' >> transform_fn_io.WriteTransformFn(work_dir))
</details>
Recall that at the end of the preprocessing phase, the code split the data into two datasets (training and evaluation).
The script uses a simple dense neural network for the regression problem. The trainer/task.py file contains the code for training the model. The main function of trainer/task.py loads the parameters needed from the preprocessing phase and passes it to the task runner function (i.e. run_fn).
In this exercise, we will not focus too much on the training metrics (e.g. accuracy). That is discussed in other courses of this specialization. The main objective is to look at the outputs and how it is connected to the prediction phase.
# Print help documentation
!python ./molecules/trainer/task.py --help
usage: task.py [-h] --work-dir WORK_DIR
optional arguments:
-h, --help show this help message and exit
--work-dir WORK_DIR Directory for staging and working files. This can be a
Google Cloud Storage path. (default: None)
# Run the trainer.
!python ./molecules/trainer/task.py --work-dir {WORK_DIR}
2022-09-05 03:53:37.288814: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
Model: "model"
__________________________________________________________________________________________________
Layer (type) Output Shape Param # Connected to
==================================================================================================
NormalizedC (InputLayer) [(None, 1)] 0 []
NormalizedH (InputLayer) [(None, 1)] 0 []
NormalizedN (InputLayer) [(None, 1)] 0 []
NormalizedO (InputLayer) [(None, 1)] 0 []
concatenate (Concatenate) (None, 4) 0 ['NormalizedC[0][0]',
'NormalizedH[0][0]',
'NormalizedN[0][0]',
'NormalizedO[0][0]']
dense (Dense) (None, 128) 640 ['concatenate[0][0]']
dense_1 (Dense) (None, 64) 8256 ['dense[0][0]']
dense_2 (Dense) (None, 1) 65 ['dense_1[0][0]']
==================================================================================================
Total params: 8,961
Trainable params: 8,961
Non-trainable params: 0
__________________________________________________________________________________________________
100/100 [==============================] - 3s 30ms/step - loss: 638.2022 - accuracy: 0.0011 - val_loss: 298.0405 - val_accuracy: 0.0017
WARNING:absl:Function `serve_tf_examples_fn` contains input name(s) ID, TotalC, TotalH, TotalN, TotalO with unsupported characters which will be renamed to id, totalc, totalh, totaln, totalo in the SavedModel.
The outputs of this phase are in the model directory. This will be the trained model that you will use for predictions.
!ls {WORK_DIR}/model
assets keras_metadata.pb saved_model.pb variables
The important thing to note in the training script is it also exports the transformation graph with the model. That is shown in these lines:
The implementation of _get_serve_tf_examples_fn() is as follows:
The use of model.tft_layer means that your model can accept raw data and it will do the transformation before feeding it to make predictions. It implies that when you serve your model for predictions, you don't have to worry about creating a pipeline to transform new data coming in. The model will already do that for you through this serving input function. It helps to prevent training-serving skew since you're handling the training and serving data the same way.
After training the model, you can provide the model with inputs and it will make predictions. The pipeline in predict.py is responsible for making predictions. It reads the input files from the custom source and writes the output predictions as text files to the specified working directory.
In `predict.py`, the code defines the pipeline in the run function:
First, the code passes the pubchem.SimpleFeatureExtraction(source) transform as the feature_extraction transform. This transform, which was also used in the preprocessing phase, is applied to the pipeline:
The transform reads from the appropriate source based on the pipeline’s execution mode (i.e. batch), formats the molecules, and counts the different atoms in each molecule.
Next, beam.ParDo(Predict(…)) is applied to the pipeline that performs the prediction of the molecular energy. Predict, the DoFn that's passed, uses the given dictionary of input features (atom counts), to predict the molecular energy.
The next transform applied to the pipeline is beam.Map(lambda result: json.dumps(result)), which takes the prediction result dictionary and serializes it into a JSON string. Finally, the output is written to the sink.
Batch predictions are optimized for throughput rather than latency. Batch predictions work best if you're making many predictions and you can wait for all of them to finish before getting the results. You can run the following cells to use the script to run batch predictions. For simplicity, you will use the same file you used for training. If you want however, you can use the data extractor script earlier to grab a different SDF file and feed it here.
# Print help documentation. You can ignore references to GCP and streaming data.
!python ./molecules/predict.py --help
usage: predict.py [-h] --work-dir WORK_DIR --model-dir MODEL_DIR
{batch,stream} ...
positional arguments:
{batch,stream}
batch Batch prediction
stream Streaming prediction
optional arguments:
-h, --help show this help message and exit
--work-dir WORK_DIR Directory for temporary files and preprocessed
datasets to. This can be a Google Cloud Storage path.
(default: None)
--model-dir MODEL_DIR
Path to the exported TensorFlow model. This can be a
Google Cloud Storage path. (default: None)
# Define model, input and output data directories
MODEL_DIR = f'{WORK_DIR}/model'
DATA_DIR = f'{WORK_DIR}/data'
PRED_DIR = f'{WORK_DIR}/predictions'
# Run batch prediction. This will take around 7 minutes.
!python ./molecules/predict.py \
--model-dir {MODEL_DIR} \
--work-dir {WORK_DIR} \
batch \
--inputs-dir {DATA_DIR} \
--outputs-dir {PRED_DIR}
Listening... 2022-09-05 03:53:53.601195: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
The results should now be in the predictions folder. This is just a text file so you can easily print the output.
# List working directory
!ls {WORK_DIR}
data model PreprocessData train-dataset transform_fn eval-dataset predictions tft-temp transformed_metadata
# Print the first 100 results
!head -n 100 /content/results/predictions/part-00000-of-00001
{"id": [1], "prediction": [37.973697662353516]}
{"id": [2], "prediction": [38.41432571411133]}
{"id": [3], "prediction": [36.203392028808594]}
{"id": [4], "prediction": [-3.051398992538452]}
{"id": [5], "prediction": [37.349037170410156]}
{"id": [6], "prediction": [23.88512420654297]}
{"id": [7], "prediction": [-26.62262535095215]}
{"id": [8], "prediction": [36.82102966308594]}
{"id": [9], "prediction": [88.31299591064453]}
{"id": [11], "prediction": [-12.124573707580566]}
{"id": [12], "prediction": [34.177249908447266]}
{"id": [13], "prediction": [-7.9857048988342285]}
{"id": [14], "prediction": [71.87216186523438]}
{"id": [16], "prediction": [18.70582389831543]}
{"id": [17], "prediction": [31.277639389038086]}
{"id": [18], "prediction": [65.68929290771484]}
{"id": [19], "prediction": [35.32212448120117]}
{"id": [20], "prediction": [39.37439727783203]}
{"id": [21], "prediction": [35.93977737426758]}
{"id": [22], "prediction": [33.91363525390625]}
{"id": [23], "prediction": [41.4879035949707]}
{"id": [26], "prediction": [27.599714279174805]}
{"id": [28], "prediction": [30.13276481628418]}
{"id": [29], "prediction": [15.606612205505371]}
{"id": [30], "prediction": [19.922500610351562]}
{"id": [31], "prediction": [66.74757385253906]}
{"id": [32], "prediction": [55.74264907836914]}
{"id": [33], "prediction": [-2.354940176010132]}
{"id": [34], "prediction": [-1.473679780960083]}
{"id": [35], "prediction": [33.73662567138672]}
{"id": [36], "prediction": [48.17616653442383]}
{"id": [37], "prediction": [56.36030197143555]}
{"id": [38], "prediction": [35.93977737426758]}
{"id": [39], "prediction": [24.848249435424805]}
{"id": [40], "prediction": [47.031288146972656]}
{"id": [41], "prediction": [56.80093002319336]}
{"id": [42], "prediction": [59.79493713378906]}
{"id": [43], "prediction": [44.123897552490234]}
{"id": [44], "prediction": [39.19036865234375]}
{"id": [45], "prediction": [40.071632385253906]}
{"id": [46], "prediction": [87.43173217773438]}
{"id": [47], "prediction": [25.729507446289062]}
{"id": [48], "prediction": [28.98788833618164]}
{"id": [49], "prediction": [23.7033748626709]}
{"id": [50], "prediction": [66.570556640625]}
{"id": [51], "prediction": [43.24263381958008]}
{"id": [58], "prediction": [21.677234649658203]}
{"id": [59], "prediction": [61.81406784057617]}
{"id": [61], "prediction": [92.8854751586914]}
{"id": [62], "prediction": [3.1058220863342285]}
{"id": [63], "prediction": [6.285392761230469]}
{"id": [64], "prediction": [13.756722450256348]}
{"id": [65], "prediction": [12.875457763671875]}
{"id": [66], "prediction": [2.6651928424835205]}
{"id": [67], "prediction": [65.68929290771484]}
{"id": [68], "prediction": [86.72750091552734]}
{"id": [69], "prediction": [36.203392028808594]}
{"id": [70], "prediction": [25.729507446289062]}
{"id": [71], "prediction": [45.26877212524414]}
{"id": [72], "prediction": [35.32212448120117]}
{"id": [73], "prediction": [81.55843353271484]}
{"id": [75], "prediction": [-3.9326603412628174]}
{"id": [77], "prediction": [48.17616653442383]}
{"id": [78], "prediction": [5.83697509765625]}
{"id": [79], "prediction": [-16.515302658081055]}
{"id": [80], "prediction": [46.41364669799805]}
{"id": [81], "prediction": [99.6681137084961]}
{"id": [82], "prediction": [103.10275268554688]}
{"id": [83], "prediction": [40.51226043701172]}
{"id": [85], "prediction": [25.033048629760742]}
{"id": [86], "prediction": [21.067378997802734]}
{"id": [87], "prediction": [22.55849838256836]}
{"id": [89], "prediction": [32.43030548095703]}
{"id": [91], "prediction": [29.16413688659668]}
{"id": [92], "prediction": [38.95634841918945]}
{"id": [93], "prediction": [45.26877212524414]}
{"id": [95], "prediction": [66.570556640625]}
{"id": [96], "prediction": [21.677234649658203]}
{"id": [98], "prediction": [19.651098251342773]}
{"id": [101], "prediction": [14.901592254638672]}
{"id": [102], "prediction": [15.782853126525879]}
{"id": [104], "prediction": [34.177249908447266]}
{"id": [105], "prediction": [60.93279266357422]}
{"id": [106], "prediction": [47.559303283691406]}
{"id": [107], "prediction": [18.953866958618164]}
{"id": [108], "prediction": [10.857108116149902]}
{"id": [109], "prediction": [26.69814109802246]}
{"id": [110], "prediction": [40.071632385253906]}
{"id": [111], "prediction": [13.588262557983398]}
{"id": [112], "prediction": [72.72856140136719]}
{"id": [113], "prediction": [73.16919708251953]}
{"id": [114], "prediction": [14.909379959106445]}
{"id": [115], "prediction": [61.82184600830078]}
{"id": [116], "prediction": [46.685054779052734]}
{"id": [117], "prediction": [-2.4290108680725098]}
{"id": [118], "prediction": [-1.9065239429473877]}
{"id": [119], "prediction": [8.303742408752441]}
{"id": [120], "prediction": [33.295989990234375]}
{"id": [122], "prediction": [57.768802642822266]}
{"id": [123], "prediction": [-12.895105361938477]}
You've now completed all phases of the Beam-based pipeline! Similar processes are done under the hood by higher-level frameworks such as TFX and you can use the techniques here to understand their codebase better or to extend them for your own needs. As mentioned earlier, the original article also offers the option to use GCP and to perform online predictions as well. Feel free to try it out but be aware of the recurring costs.
On to the next part of the course!