This chapter covers Cloud Composer, Google Cloud's managed workflow orchestration service built on Apache Airflow. For the ACE exam, you must understand how to deploy, configure, and manage Cloud Composer environments, including DAGs, operators, connections, and monitoring. Approximately 10-15% of exam questions touch on orchestration and automation, with Cloud Composer being the primary service tested. This chapter provides the depth needed to answer scenario-based questions about scheduling, dependencies, retries, and integration with other GCP services.
Jump to a section
Imagine a factory assembly line where multiple robots and workers perform tasks in a specific order to build a product. The factory manager (Cloud Composer) doesn't do the actual work but oversees the process. The manager receives a blueprint (DAG) that lists every step—like 'fetch parts,' 'assemble frame,' 'add engine'—and the dependencies between them. The manager creates a work order for each product (DAG run) and assigns tasks to workers (operators). Workers can be human (Python functions) or specialized machines (e.g., a welding robot = Dataproc cluster). The manager uses a bulletin board (Airflow metadata database) to track each task's status: queued, running, success, or failed. If a task fails, the manager can retry it up to a set number of times or notify a supervisor (alert). The manager also ensures that no two tasks that require the same resource (e.g., a specific robot) run at the same time (pool). The factory floor has limited space (worker concurrency) and the manager can schedule work only during certain shifts (start_date, schedule_interval). The entire factory runs inside a secure facility (VPC) with controlled access (IAM). Just as a factory manager can be replaced without stopping the line, Cloud Composer can be upgraded or scaled without affecting running workflows—new runs use the new version.
What is Cloud Composer and Why Does It Exist?
Cloud Composer is a fully managed workflow orchestration service built on Apache Airflow. It allows you to create, schedule, monitor, and manage workflows (also called pipelines) that span multiple Google Cloud services, on-premises systems, or third-party APIs. Workflows are defined as Directed Acyclic Graphs (DAGs) written in Python. Each DAG defines a set of tasks and their dependencies. Cloud Composer handles the underlying infrastructure: it provisions a Kubernetes cluster, installs Airflow, manages scaling, and provides upgrades and patches. The ACE exam tests your ability to deploy environments, write DAGs, use operators, configure connections, and monitor execution.
How Cloud Composer Works Internally
Cloud Composer environments consist of several components:
- Airflow Scheduler: Monitors all DAGs and tasks, triggers DAG runs based on schedule, and submits tasks to the worker queue.
- Airflow Workers: Execute the tasks defined in DAGs. Workers are stateless and pull tasks from the queue.
- Airflow Web Server: Provides the Airflow UI for monitoring, triggering, and debugging DAGs.
- Cloud SQL (Airflow Database): Stores metadata about DAGs, task instances, variables, connections, and logs. By default, it uses a Cloud SQL MySQL (or PostgreSQL) instance.
- Cloud Storage (DAGs and plugins): You upload DAG files and plugins to a specific Cloud Storage bucket (e.g., gs://us-central1-myenv-xxxx-bucket). The scheduler and workers sync from this bucket.
- Workers Queue (Cloud Tasks or Redis): Manages task execution order.
- Monitoring: Cloud Monitoring metrics and logs are automatically integrated.
When you upload a DAG file to the dags/ folder in the environment's bucket, the scheduler parses it and updates the metadata database. At the scheduled time, the scheduler creates a DAG run and starts queuing tasks. Workers pick up tasks, execute them, and report status back to the database.
Key Components, Values, Defaults, and Timers
Environment size: Small, Medium, Large. Determines the number of nodes in the GKE cluster. Default is Small (3 nodes).
Software version: Cloud Composer versions correspond to Airflow versions (e.g., Composer 2.x uses Airflow 2.x). The exam focuses on Composer 2 (Airflow 2).
Default Airflow configuration: parallelism (max task instances per scheduler) default is 32; dag_concurrency (max tasks per DAG) default is 16; max_active_runs_per_dag default is 16.
Retries: Default retries is 0; can be set per task. Default retry_delay is 5 minutes.
Schedule interval: Defined using cron expressions or Airflow presets (@daily, @hourly). Default is None (manual only).
DAGs bucket: Auto-generated bucket with folders: dags/, plugins/, data/, logs/.
IAM roles: roles/composer.worker for DAG execution, roles/composer.environmentAndStorageObjectViewer for read-only access.
Environment deletion: Deletes all resources including Cloud SQL, GKE cluster, and bucket (by default). Bucket can be retained.
Configuration and Verification Commands
You interact with Cloud Composer primarily via gcloud composer commands and the Console.
Create an environment:
gcloud composer environments create my-env \
--location us-central1 \
--image-version composer-2.5.0-airflow-2.5.1Upload a DAG:
gcloud composer environments storage dags import \
--environment my-env --location us-central1 \
--source ./my_dag.pyList DAGs:
gcloud composer environments run my-env \
--location us-central1 list_dagsTrigger a DAG:
gcloud composer environments run my-env \
--location us-central1 trigger_dag -- my_dag_idView logs:
gcloud composer environments logs list \
--environment my-env --location us-central1Update environment:
gcloud composer environments update my-env \
--location us-central1 \
--update-env-variables=KEY=VALUEHow It Interacts with Related Technologies
Cloud Storage: DAGs, plugins, and data files are stored in Cloud Storage. Output data can be written to Cloud Storage.
BigQuery: Use BigQueryInsertJobOperator to run queries, BigQueryToCloudStorageOperator to export.
Dataproc: Use DataprocCreateClusterOperator, DataprocSubmitJobOperator, DataprocDeleteClusterOperator for Spark/Hadoop jobs.
Cloud Dataflow: Use DataflowStartFlexTemplateOperator or DataflowTemplatedJobStartOperator.
Cloud Functions: Use FunctionClient or CloudFunctionInvokeFunctionOperator.
Pub/Sub: Use PubSubPublishOperator, PubSubPullOperator.
Kubernetes: Use KubernetesPodOperator to run containers on GKE.
AI Platform: Use AIPlatformModelDeployOperator, etc.
Cloud SQL: Can connect to Cloud SQL via MySqlOperator or PostgresOperator using connections.
DAG Writing Essentials for the Exam
A DAG is a Python script that defines the DAG structure. Key components:
- Default arguments: owner, depends_on_past, email, email_on_failure, email_on_retry, retries, retry_delay, start_date, end_date.
- DAG object: dag = DAG(dag_id='my_dag', default_args=default_args, schedule_interval='@daily', catchup=False).
- Operators: Instantiate operators with task_id, dag, and operator-specific parameters.
- Dependencies: Use >> or set_downstream(): task1 >> task2 >> [task3, task4].
- Sensors: Wait for a condition (e.g., PubSubPullSensor, BigQueryTableExistenceSensor).
- TaskFlow API: Decorators @dag, @task for Python-native pipelines.
Example minimal DAG:
from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
default_args = {
'owner': 'me',
'start_date': datetime(2024, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'example_dag',
default_args=default_args,
schedule_interval='@daily',
catchup=False,
)
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
dag=dag,
)
t1 >> t2Environment Management and Scaling
Scaling: By default, environments autoscale based on load. You can set minimum and maximum worker count via --worker-count (not directly in gcloud, but via environment variables: AIRFLOW__CORE__PARALLELISM, AIRFLOW__CORE__DAG_CONCURRENCY).
Upgrades: You can upgrade the Airflow version within the same major version. Use gcloud composer environments update --image-version.
Private IP: Environments can be deployed in a private network (VPC) with no public IP for enhanced security.
Web server access control: By default, the Airflow UI is accessible via IAP (Identity-Aware Proxy). You can also set it to public or private.
Monitoring and Logging
Cloud Monitoring: Metrics like composer.googleapis.com/environment/healthy, task_instance_duration, dag_processing_duration.
Cloud Logging: Logs from scheduler, workers, and web server are sent to Cloud Logging. You can create log-based metrics.
Airflow UI: Provides real-time status of DAG runs, task instances, and logs. You can trigger DAGs, clear tasks, and mark success/failure.
Alerts: Set up Cloud Monitoring alerts on failed tasks, long-running tasks, or environment health.
Create a Cloud Composer Environment
Use the Google Cloud Console or `gcloud composer environments create` to provision a new environment. You must specify a location (e.g., `us-central1`), an image version (e.g., `composer-2.5.0-airflow-2.5.1`), and optionally set node count, machine type, network, and environment variables. The service creates a GKE cluster, Cloud SQL instance, and Cloud Storage bucket. This process takes 20-30 minutes. The environment is ready when the status is 'RUNNING'.
Write and Upload a DAG File
Write a Python DAG script that defines your workflow. The DAG must be placed in the `dags/` folder of the environment's Cloud Storage bucket. You can upload using `gcloud composer environments storage dags import` or via the Console. The scheduler parses the DAG file every few minutes (default `dag_dir_list_interval` = 300 seconds). If the DAG is valid, it appears in the Airflow UI. Invalid DAGs cause a syntax error, and the DAG is not loaded.
Configure Connections and Variables
Set up connections to external services (e.g., BigQuery, Dataproc, Cloud SQL) in Airflow UI under Admin > Connections or via environment variables. Use `airflow connections add` or the UI to define connection IDs, types, and credentials. For GCP services, a default connection `google_cloud_default` is automatically created using the environment's service account. Variables can be set via UI, CLI, or environment variables (`AIRFLOW_VAR_*`).
Trigger and Monitor DAG Execution
DAGs run automatically based on `schedule_interval`. You can also manually trigger from the Airflow UI or via `gcloud composer environments run trigger_dag`. The scheduler creates a DAG run and queues tasks. Workers execute tasks, updating the metadata database. Monitor progress in the UI: tree view shows task status (queued, running, success, failed, skipped). Logs for each task instance are available in the UI and Cloud Logging.
Handle Failures and Retries
If a task fails, Airflow can retry up to `retries` times with a delay of `retry_delay`. The task goes to 'up_for_retry' state. After exhausting retries, it becomes 'failed'. You can manually clear a task instance to re-run it. Use `depends_on_past` to prevent downstream tasks from running if a previous run failed. Set `email_on_failure` to get notifications. In production, configure Cloud Monitoring alerts for failed tasks.
Scale and Update the Environment
As workload grows, you can scale the environment by adjusting worker count or machine type. Use `gcloud composer environments update` with `--node-count` or `--machine-type`. You can also update Airflow configuration via environment variables. To upgrade the Airflow version, use `--image-version` pointing to a newer supported version. Environment updates may cause brief downtime for running tasks (they continue on existing workers, but new tasks may be delayed).
Enterprise Scenario 1: ETL Pipeline for Data Warehouse
A large retail company uses Cloud Composer to orchestrate daily ETL jobs. Every night, a DAG runs that:
1. Extracts data from on-premises databases using JdbcOperator or CloudDataFusion.
2. Loads raw data into Cloud Storage.
3. Runs a Dataproc Spark job to transform data.
4. Loads transformed data into BigQuery.
5. Triggers a Dataflow pipeline for aggregations.
6. Sends a success/failure notification via Pub/Sub to a Slack webhook.
The environment is configured with max_active_runs_per_dag=1 to avoid overlapping runs. The DAG uses depends_on_past=True to ensure data consistency. Retries are set to 2 with a 10-minute delay. In production, they encountered issues when the Dataproc cluster creation failed due to quota limits; they added a sensor to wait for cluster availability. Performance: the pipeline processes 500 GB per night; the environment uses Medium size with 5 workers.
Enterprise Scenario 2: Machine Learning Pipeline
A fintech startup uses Cloud Composer to automate ML model retraining. The DAG:
1. Checks for new training data in BigQuery.
2. Runs a Dataflow job to preprocess features.
3. Submits a Vertex AI training job.
4. Deploys the model to an endpoint if accuracy > threshold.
5. Sends model metrics to Cloud Monitoring.
They use BranchPythonOperator to conditionally deploy. The environment is private IP for security, with IAP access for the Airflow UI. They set AIRFLOW__CORE__PARALLELISM=64 to handle many concurrent tasks. A common misconfiguration: forgetting to set catchup=False causes backfilling of all missed DAG runs when a new DAG is added, overwhelming resources.
Scenario 3: Multi-cloud Workflow
A media company orchestrates workflows across AWS and GCP. Cloud Composer manages cross-cloud dependencies using SimpleHttpOperator to trigger AWS Lambda functions and S3KeySensor to wait for files in S3 (via a custom plugin). Connections store AWS credentials. They use Airflow pools to limit concurrent external API calls to avoid rate limiting. Monitoring is done via Cloud Logging and custom metrics. Pitfall: if the environment's service account doesn't have permissions to access external resources, tasks fail silently; they use gcloud composer environments run -- list_dags to verify DAG parsing, and check task logs for errors.
The ACE exam tests Cloud Composer under objective 3.3 'Deploy and implement workflow orchestration using Cloud Composer.' Focus on:
DAG Basics: Know how to define a DAG with schedule_interval, start_date, catchup, and default args. The exam loves catchup=False to prevent backfilling. Common wrong answer: setting catchup=True when you want to skip past runs.
Operators: You must know which operator to use for a given service. For example, BigQueryInsertJobOperator for BigQuery queries, DataprocSubmitJobOperator for Spark jobs, DataflowTemplatedJobStartOperator for Dataflow. Wrong answer: using BashOperator to run bq command instead of the native operator.
Connections: The default google_cloud_default connection uses the environment's service account. You can override with a different service account key. Exam may ask: 'How to connect to a Cloud SQL database?' Answer: create a connection with type 'MySQL' or 'Postgres' and provide host, schema, login, password.
Environment Management: Commands like gcloud composer environments create, update, delete. Know that deleting an environment deletes the GKE cluster, Cloud SQL, and bucket (by default). Wrong answer: thinking the bucket is always retained.
Scheduling and Dependencies: Understand depends_on_past, trigger_rule (e.g., all_done, one_failed). Exam trap: a task that should run even if upstream fails should use trigger_rule='all_done'.
Error Handling: Retries, retry_delay, email_on_failure. Exam may ask: 'A task fails, how many times will it retry by default?' Answer: 0 (no retries). Common wrong answer: 3.
Airflow UI: Know how to access it (via IAP URL), and that you can manually trigger DAGs, view logs, and clear tasks.
Edge Cases:
- DAGs must be valid Python; otherwise they won't appear in UI.
- If you change schedule_interval after DAG has run, old runs are not affected.
- start_date must be in the past for scheduled runs to trigger.
- Environment variables can be set at environment creation or update.
To eliminate wrong answers, focus on the mechanism: Airflow uses a scheduler that parses DAGs, creates runs, and queues tasks. Workers execute them. Metadata is stored in Cloud SQL. Any answer that contradicts this flow (e.g., 'tasks are executed by the web server') is wrong.
Cloud Composer is managed Apache Airflow; DAGs are Python files stored in Cloud Storage.
Default DAG schedule_interval is None (manual trigger); use cron or presets like @daily.
Set catchup=False to avoid backfilling past DAG runs on new DAGs.
Default retries per task is 0; set retries in default_args or per task.
Use gcloud composer environments run <env> trigger_dag -- <dag_id> to manually trigger.
The default google_cloud_default connection uses the environment's service account.
Delete environment: by default deletes GKE, Cloud SQL, and bucket (unless --delete-bucket=false).
Environment scaling: adjust min/max workers via environment variables like AIRFLOW__CORE__PARALLELISM.
These come up on the exam all the time. Here's how to tell them apart.
Cloud Composer (Airflow)
Workflows defined as Python DAGs, allowing complex logic and conditionals.
Supports hundreds of operators for GCP and external services.
Managed infrastructure (GKE cluster) with autoscaling.
Ideal for long-running, stateful workflows with retries and sensors.
Higher latency for task execution (seconds to minutes).
Cloud Workflows
Workflows defined as YAML steps (HTTP requests, subworkflows).
Limited to HTTP-triggered steps; no native operators.
Serverless, no infrastructure to manage.
Best for short-lived, stateless workflows (under 30 minutes).
Low latency (milliseconds) for step execution.
Mistake
Cloud Composer is a serverless service with no infrastructure to manage.
Correct
Cloud Composer is managed, but it runs on a GKE cluster. You don't manage nodes directly, but you must choose machine types, node counts, and update versions. It is not serverless; it provisions underlying VMs.
Mistake
DAGs are executed in the order they are defined in the Python file.
Correct
DAGs define dependencies using `>>` or `set_downstream()`. Tasks without dependencies may run in parallel. The order of definition does not determine execution order.
Mistake
The default service account has access to all GCP services.
Correct
The default service account is the Compute Engine default service account, which has `editor` role on the project. But you can (and should) use a custom service account with minimal permissions. The exam expects you to know that you can attach a service account.
Mistake
Deleting an environment always deletes the Cloud Storage bucket.
Correct
By default, the bucket is deleted. However, you can set `--delete-bucket=false` to retain it. The exam may test this detail.
Mistake
You can run any Python library in a DAG without installation.
Correct
Workers have a pre-installed set of Python packages. You can install additional packages via PyPI packages in the environment (Composer 2+). Use `gcloud composer environments update --add-pypi-packages`.
Reveal each answer, then mark whether you got it right. Score 60%+ to unlock the next chapter.
Use `gcloud composer environments update <env> --location <loc> --add-pypi-packages package1==version,package2==version`. This installs packages on all workers. Alternatively, you can use a requirements.txt file and upload it to the plugins/ folder, but the CLI method is preferred. Note that packages are installed during worker startup, so it may take a few minutes.
`catchup` (boolean) determines whether Airflow creates DAG runs for intervals between the start_date and the current date that were missed. `depends_on_past` (boolean) ensures that a task instance only runs if the previous task instance (same task, previous schedule) succeeded. Use `catchup=False` to avoid backfilling; use `depends_on_past=True` for sequential dependencies.
Create an Airflow connection of type 'MySQL' or 'Postgres' with the database host, schema, login, and password. For Cloud SQL, you can use the Cloud SQL proxy or private IP. In Composer 2, you can use the Cloud SQL connection operator directly. Ensure the environment's service account has the Cloud SQL Client role.
Common reasons: (1) The DAG file has a syntax error—check logs in Cloud Logging for 'DAG parsing' errors. (2) The DAG is not in the `dags/` folder—it must be in the top-level `dags/` directory. (3) The DAG object is not defined in the global scope—it must be assigned to a variable at module level. (4) The scheduler hasn't parsed yet—wait up to 5 minutes (default `dag_dir_list_interval`).
Use the Airflow REST API. First, enable the Airflow REST API in the environment (set `airflow/config/api/auth_backend` to `airflow.api.auth.backend.default`). Then, make an HTTP POST request to the endpoint `/api/v1/dags/{dag_id}/dagRuns` with an optional JSON body. You'll need to authenticate via IAP or service account. Alternatively, use `gcloud composer environments run` from a script.
Yes. When creating the environment, specify `--private-ip` and provide a VPC network. The environment will have no public IPs. You must configure Cloud NAT for outbound internet access if needed. The Airflow web server can be accessed via IAP. This is a common exam scenario for security.
By default, Airflow allows multiple DAG runs to overlap. If you want to prevent overlapping, set `max_active_runs_per_dag=1` in the DAG definition. If a run is still active when the next scheduled time arrives, the new run will be queued until the previous one completes.
You've just covered Cloud Composer for Workflow Orchestration — now see how well it sticks with free ACE practice questions. Full explanations included, no account needed.
Done with this chapter?