Dataflow Flex templates and how to use them

If you’re here, I know you couldn’t find a solid solution. I can understand all your frustration is solving the problem with Flex templates. Our team tried to fix it for a couple of days with the help of some GCP consultants themselves. In the end, we managed to fix it!

You can read this article on Medium as well.

What is the problem?

For those who don’t know what I’m talking about, it’s about “How to add multiple source code files into a GCP Dataflow Flex Template“.

Before going into that, I must explain what those terms are.

What is Dataflow?

Google Cloud DataFlow is a fully managed service for executing Apache Beam pipelines within the Google Cloud Platform ecosystem.

Wikipedia

It’s used to process high volumes of data on Cloud on a streaming pipeline, in contrast to DataProc which can process batches of data using Apache Spark.

What is a Template?

You can submit a Dataflow job to the cluster, but you have to specify the source code location, the parameters and the other configurations (region, worker types etc.) every time you do that. It can be hard when you want to create a proper/production pipeline. Also, Apache Beam has to determine the flow (the DAG as shown above) before it can execute the code, and it can take time.

So, we need to create a template out of it! It’s simply a definition of the pipeline including all the parameter/configuration placeholders it can take. When we run the template, we can also override these values for the intended values.

There two types of templates

  1. Classic Templates
    We run the pipeline locally (with a special flag) to generate the template and it is saved into a file in a GCS bucket.
  2. Flex Templates
    The pipeline is packaged in to a Dockerfile (then pushed to GCR), and the flex template file is saved in to a file in a GCS bucket.

I won’t bore you to death by explaining everything here. You can read about Templates in Dataflow in this article.

The biggest problem with the Classic Templates is that the DAG is pre-determined when you create the template. It means, even though we can pass different values into the Beam Transforms (steps), we can’t change the number of parameters.

my_list = get_a_dynamic_list_of_paths()
output = p | "Step1" >> beam.Create(my_list)
#              This doesn't work.   ^

Example: If you have a parameter to pass the input location of your files, you can only pass one and only one value to that. If you need to pass multiple locations, the pipeline will fail. Because, when you generate the template, it scans the code and already determines the number of inputs passes into the steps and it’s stored into the template file.

So, there should be a mechanism to generate the DAG dynamically and that’s where the new Flex Templates comes into the picture.

Creating a Classic Template

It’s as same as running the pipeline, except we need to pass a new parameter --template_location with the location we need to save the template.

Note: –template true is an internal parameter which we use to load some parameter values from an internal module.

Example make recipe.

template:
    PYTHONPATH=./src python src/dataflow/main.py \
      --region $(REGION) \
      --runner DataflowRunner \
      --project $(PROJECT) \
      --template true\
      --staging_location gs://$(TEMP_BUCKET)/staging \
      --network $(NETWORK)   \
      --machine_type n1-standard-4 \
      --setup_file src/dataflow/setup.py \
      --save_main_session true \
      --temp_location gs://$(TEMP_BUCKET)/temp \
      --template_location gs://$(TEMPLATE_BUCKET)/Tmpl_v1

Creating a Flex template

Creating and running a Dataflow pipeline involves a bit more steps.

If you follow the official documentation, there is a high chance that you fail especially if you have multiple source code files in your pipeline, like normal people do!

Folder structure

IMPORTANT

It’s very important to have the setup file in the Dockerfile and it MUST contain the setuptools.find_packages() line. This makes sure all of your source code files are added to the PYTHONPATH so you can import them as modules. Otherwise, your Dataflow job will keep on complaining that it can’t find your modules.

No module named <whatever>

All the sample code and tutorials show us using a single python file, and it works without any problem. But if you need to have multiple source code file (either for more readability or re-usability), those instructions won’t work. That’s why you need to add all the files into the Dockerfile along with the setup.py file.

You can decide where you want the requirements to be defined. Some people define it in a requirements.txt and install them inside the Dockerfile, but it’s easy to define it in the setup.py file as we anyway need to have it.

You also need image_spec.json file directing to the GCR image name where the pipeline image gets deployed.

python_command_spec.json defines the main file to be executed.

Then the template_metadata file is the metadata file validating the parameters we provide to the pipeline execution. (Here is a sample).

Well, it’s just the beginning. Here is how you can deploy them.

Deploy the Flex template

First, you need to build the template file and upload it to GCS.

export TEMPLATE_PATH=gs://$(TEMPLATE_BUCKET)/my-logic/python_command_spec.json
export TEMPLATE_IMAGE=gcr.io/$(PROJECT_ID)/my-logic:latest

gcloud dataflow flex-template build $(TEMPLATE_PATH) --image "$(TEMPLATE_IMAGE)" --sdk-language "PYTHON" --metadata-file spec/template_metadata

This will compile your metadata file into a spec file and upload it to the given template path.

Then, you need to build the Template as a Docker image. For this purpose, Google recommends to use gcloud build instead of docker build.

gcloud builds submit --project=${PROJECT_ID} --tag ${TEMPLATE_IMAGE}

This will create a Docker image to trigger your Dataflow pipeline. This image will be pulled by a launcher VM to determine/generate the DAG depending on the parameters we pass when we start the Dataflow job.

Running the pipeline

You can obviously read the corresponding section in the official documentation to know how to run it. But that shows how to trigger the pipeline from the shell or using a REST API call.

However, if you need to trigger it programmatically (i.e. from your python code), you can use the below piece of code.

from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials

credentials = GoogleCredentials.get_application_default()
# cache_discovery should be set to False to avoid errors
dataflow = build('dataflow', 'v1b3', credentials = credentials, cache_discovery=False)

request = dataflow.projects().locations().flexTemplates().launch(
        projectId= project_id,
        location= dataflow_region,
        body = {
            'launch_parameter': {
                'jobName': 'my-job-name',
                'parameters': {params_dict},
                'containerSpecGcsPath': 'template_location',
                'subnetwork': 'subnet_uri_to_run_dataflow',
                'setup_file': '/dataflow/template/setup.py'
            }
        }
    )

request.execute()

parameters should contain all the parameters your template expects, as defined in the metadata file.

containerSpecGcsPath should be the GSC path you specified the spec file in the flex-template build command, as TEMPLATE_PATH.

I will write a separate article on how we can orchestrate multiple Dataflow jobs using a Cloud Function (without Google Composer). But this is it for this article.

Summary

If you need to define your Dataflow pipeline, you have to use a template. Because the Classic templates doesn’t support dynamic number of parameter, we can use Flex templates which generates the DAG at runtime.

Flex template code is deployed using a Docker image and you can specify all your source code files and other dependencies there.

However, the official documentation does NOT show how to have multiple files in your source code, and if you try to do things blindly, it will fail. For this, you need to pass a setup.py file denoting the installation steps for your package. It MUST include setuptools.find_packages() command for it to be able to detect your source code files as modules. This setup.py will triggered in a launcher VM along with your pipeline (as a template) to determine the DAG for the pipeline before it’s deployed inside workers.

Overall, I find Flex templates to be a better option than the Classic templates as it allows your code to have dynamic number of parameters which can be passed only when you run the code.

In Classic templates, the template generation is done in the local machine (or whichever building environment), and it MUST have all the requirements/libraries installed because it actually parses the source code to determine the DAG. But Flex template DAG is generated on cloud (the launcher VM) at runtime, so you don’t need to have the requirements installed.

On the contrary, it will take additional time to generate the DAG whenever your launch your pipeline. You can see this launcher VM in the Cloud Compute section. So, if your pipeline doesn’t need to benefit from the features of Flex templates, you’re better off with Classic templates. However, Google might deprecate the Classic templates in the near future, so it’s a safe bet to migrate to Flex templates. And it’s not hard either.

Huge thanks for Miguel Florido and Akshay Apte for spending hours in finding the solutions for the mentioned problems.

Thanks for reading and here is a cookie.

08 comments on “Dataflow Flex templates and how to use them

  • Marco Mistroni , Direct link to comment

    Hello
    great article but i am having issues when i run gcloud build.
    From which directory shall i run it?
    i have same setup as you outlined.
    When i run the command from within the pipeline directory i am getting this
    Would you know what am i doing wrong?
    regards Marco

    Step 4/11 : COPY pipeline ${WORKDIR}/pipeline
    COPY failed: file not found in build context or excluded by .dockerignore: stat pipeline: file does not exist
    ERROR
    ERROR: build step 0 “gcr.io/cloud-builders/docker” failed: step exited with non-zero status: 1

    • Praneeth Peiris , Direct link to comment

      Hi,
      I run the command outside of pipeline folder. Then it should pick it up in the COPY command.

  • Marco Mistroni , Direct link to comment

    Sure thanks, But then i got another problem which i cannot figure out

    2021-05-20 21:31:31.010 BSTFailed to read the job file : gs://dataflow-staging-us-central1-
    682143946483/staging/template_launches/2021-05-20_13_29_04-16162799362443715124/job_object with error message: (a47f706568665b89): Unable to open template file: gs://dataflow-staging-us-central1-682143946483/staging/template_launches/2021-05-20_13_29_04-16162799362443715124/job_object..

    So, after i launched gcloud build, and kicked off the job from DataFlow page in gcpconsole (by selecting the json template) it didnt work out
    I am wondering if it is an issue with the way i kicked off the job
    regards

    • Praneeth Peiris , Direct link to comment

      Double-check if you have the job template properly uploaded to gs://dataflow-staging-us-central1-682143946483/staging/template_launches/2021-05-20_13_29_04-16162799362443715124/job_object.
      Sometimes it won’t if your service account doesn’t have access to upload to GCS.
      Also, you can check if the GCR images are properly uploaded. Then if that IAM role has permission to read from the templates bucket. And also note your dataflow job is in us-central1 and your buckets should be either global or in that region.

      • Marco Mistroni , Direct link to comment

        Hey
        how do you import the modules in the pipeline package from your main?
        I keep on getting error where it cannot find any modules in my subdirectory

        kind regards

        • Praneeth Peiris , Direct link to comment

          Did you add them to the Dockerfile? Also, do you have a `__init__.py` inside your pipeline folder?

          • Marco Mistroni , Direct link to comment

            Right, so after tinkering for a while, i figured out that beam 2.27 ignores
            ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE=”${WORKDIR}/setup.py”
            i found out an old thread of your mate Akshay Apte where it was suggested to add a setup_file parameters when runnign the template, and it worked

            rgds

Leave a Reply