Dagster Architecture, Object
1. Dagster Architecture, Object
![[Figure 1] Dagster Architecture](/blog-software/docs/theory-analysis/dagster-architecture-object/images/dagster-architecture.png)
[Figure 1] Dagster Architecture
[Figure 1] shows the Dagster Architecture. It can be divided into the Control Plane, where user-defined workflows exist, and the Data Plane, where workflows actually run. Dagster provides various types of Objects for workflow composition, and users can compose workflows using these Objects.
1.1. Code Location
Most Dagster Objects used in workflows are all defined and utilized in the Code Location of the Control Plane. That is, users who compose workflows define Dagster Objects and register them in Code Location, and Dagster uses them to compose and execute workflows. The Dagster Objects defined and used in Code Location are as follows.
1.1.1. Op, Job
| |
Op represents the smallest unit of action executed in a workflow. Workflows can be composed by combining these Ops. In Airflow terms, a Task corresponds to an Op in Dagster. Job represents a single workflow and can contain one or more Ops.
[Code 1] shows an example of Op and Job. Six action functions are defined: generate_numbers, filter_even_numbers, filter_odd_numbers, sum_even_numbers, sum_odd_numbers, sum_two_numbers, and they are marked as Ops through the @op decorator. Also, a process_numbers Job function is defined and marked as a Job through the @job decorator. You can see that the defined Ops are called in DAG form within the Job function. Through decorators, you can define various metadata such as Description or Tags along with Object specification.
![[Figure 2] Dagster Op, Job Example](/blog-software/docs/theory-analysis/dagster-architecture-object/images/dagster-op-job-example.png)
[Figure 2] Dagster Op, Job Example
[Figure 2] shows the Op and Job from [Code 1] viewed in Dagster’s Web Console. You can see that the defined Ops are called in DAG form within the Job function. Dagster recommends composing workflows centered on Data rather than Action-oriented workflows, so it recommends composing workflows centered on Assets, which will be introduced next, rather than Ops. Therefore, Ops are used for actions that are difficult to consider as Assets, such as Slack notifications/e-mail notifications, or for splitting actions when too many actions are needed within a single Asset.
1.1.2. Asset
| |
Asset refers to data created during the workflow process. Not only the final data in the ETL process but also intermediate data created during the ETL process can be defined as Assets. That is, workflows can be understood as a process of data transformation rather than sequential execution of actions, and the Dagster Object used in this case is Asset.
[Code 2] shows an example of Assets. Six Asset functions are defined: generated_numbers, filtered_even_numbers, filtered_odd_numbers, summed_even_numbers, summed_odd_numbers, summed_two_numbers, and they are marked as Assets through the @asset decorator. They perform the same role as the Ops in [Code 1], but are centered on Data rather than Action, and you can see that the Asset names use passive voice based on the data numbers.
The grammatical difference between Assets and Ops is that Assets receive other Assets as parameters. The parameters of the filtered_even_numbers and filtered_odd_numbers assets are specified as generated_numbers, which means they receive the generated_numbers asset as input. Similarly, the parameters of the summed_two_numbers asset are specified as summed_even_numbers and summed_odd_numbers assets, which means they receive the summed_even_numbers and summed_odd_numbers assets as inputs.
That is, dependencies between Assets can be expressed through Asset parameters, and they are naturally expressed in DAG form. The define_asset_job function converts these Assets into a single Job. Selection specifies which Assets to include, and [Code 2] specifies to include Assets belonging to the numbers group.
![[Figure 3] Dagster Asset Example](/blog-software/docs/theory-analysis/dagster-architecture-object/images/dagster-asset-example.png)
[Figure 3] Dagster Asset Example
[Figure 3] shows the Assets from [Code 2] viewed in Dagster’s Web Console. You can see that Assets are expressed in DAG form, and the Asset lineage is naturally expressed. For Assets, the execution process is expressed as a Materialize process.
1.1.3. External Resource
External Resource refers to various external resources supported by Dagster. Mainly I/O Managers, external data storage, and BI tools are defined and used as External Resources. Among External Resources, I/O Manager is an important External Resource because it handles data transfer between Ops or Assets. I/O Manager can use various backends, and the main supported backends are as follows.
- FilesystemIOManager : Stores data in the local filesystem. If no I/O Manager is specified separately, it operates as the default I/O Manager.
- InMemoryIOManager : Stores data in local memory.
- s3.S3PickleIOManager : Stores data in Pickle format on AWS S3.
- GCSPickleIOManager : Stores data in Pickle format on GCP GCS.
- BigQueryPandasIOManager : Stores data in Pandas DataFrame format on BigQuery.
- BigQueryPySparkIOManager : Stores data in PySpark DataFrame format on BigQuery.
| |
[Code 3] shows an example of defining an I/O Manager as an External Resource. The example uses S3PickleIOManager as the I/O Manager and also defines S3 used as the backend as an External Resource. Settings are defined in Python dictionary format.
I/O Manager is designed to easily transfer relatively small-sized data and is not designed to quickly transfer very large data of several tens of TB or more through parallel processing. Therefore, when transferring large data, it is effective to store the data in external storage first and then pass the path where the data is stored through the I/O Manager. The I/O Managers that can be used may be limited depending on the Run Launcher or Executor that determines how Ops or Assets are executed.
1.1.4. Schedule
| |
Schedule performs the role of periodically executing workflows using cron format syntax. [Code 3] shows an example of defining a Schedule that executes the process_numbers Job defined in [Code 1] and the process_numbers_asset Job defined in [Code 2] every minute.
1.1.5. Sensor
| |
Sensor performs the role of executing workflows based on external conditions. [Code 4] shows an example of defining a Sensor that checks whether the /check file exists. If the file exists, it executes the process_numbers Job, and if the file does not exist, it does not execute the process_numbers Job. The Sensor is defined through the dg.sensor decorator, and the Job to execute is specified through the job parameter. Sensors operate based on polling, and the minimum_interval_seconds parameter specifies the polling interval.
1.1.6. Definitions
| |
Definitions performs the role of registering all Objects used in Dagster. [Code 5] shows an example of defining a Definitions Object that includes the Objects defined in [Code 1 ~ 4]. Objects not registered in Definitions are not recognized by Dagster, so they must be registered in Definitions.
1.2. Dagster Instance
| |
Dagster Instance refers to an Object that contains all configuration information for the Dagster Control Plane, and internally manages configuration information in the form of a dagster.yaml file. All Components of the Dagster Control Plane access the Dagster Instance to retrieve and use configuration information. [File 1] shows an example of a Dagster Instance.
1.3. Dagster Database
Database performs the roles of Run Storage, Event Storage, and Schedule Storage, and all Components of the Dagster Control Plane access and use it. Configuration information for Database and each Storage can be found in the Dagster Instance ([File 1]).
- Run Storage : A single Run represents a single triggered workflow, and Run Storage is a storage that stores state information of such Runs. That is, it stores metadata of Runs such as the current state or execution results of workflows.
- Event Storage : Event Storage is a storage that stores Events that occur during workflow execution.
- Schedule Storage : Storage that stores workflow schedule information.
1.4. Dagster Workflow Trigger
Dagster Web Server, Dagster CLI, and Dagster Daemon perform the role of triggering workflows. All three Components trigger workflows by referencing Code Location and Dagster Instance information, and state information of triggered workflows is stored in the Database according to the Storage configured in the Dagster Instance.
Users can directly trigger workflows through Dagster Web Server or CLI, and Dagster Daemon triggers Jobs through Schedule Objects or Sensor Objects defined by users in Code Location. Triggered workflows go through a scheduling process in the Run Coordinator and a Run is created by the Run Launcher, and within the Run, Ops or Assets are executed one by one through the Executor, and the workflow is performed.
| |
A single Run represents a single triggered workflow, and when the workflow ends, the Run responsible for the ended workflow also ends. Run performs the actual Control Plane role of the workflow and executes Ops or Assets sequentially in DAG form through the Executor. Run is created by the Run Launcher. Dagster provides several types of Run Launchers and Executors, and the way workflows are executed depends on the configured Run Launcher and Executor. In a single Dagster Cluster, only one Run Launcher Type configured in the Dagster Instance ([File 1]) can be used, and Executors can be configured and used for each Workflow (Job). [Code 6] shows an example of setting an Executor. You can see that it is set through the executor_def parameter of the Job.
The Run Launcher Types supported by Dagster are as follows.
- K8sRunLauncher : Run is executed as a Kubernetes Job (Pod).
- ecs.EcsRunLauncher : Run is executed as an AWS ECS Task.
- DockerRunLauncher : Run is executed as a Docker Container.
- CeleryK8sRunLauncher : Run is executed as a Kubernetes Job (Pod) using Celery.
The main Executors supported by Dagster are as follows.
- in_process_executor : Ops/Assets are executed sequentially within a single process.
- multiprocess_executor : Ops/Assets are executed in parallel within multiple processes.
- celery_executor : Ops/Assets are executed in parallel using Celery.
- docker_executor : Ops/Assets are executed in parallel using Docker Containers.
- k8s_job_executor : Ops/Assets are executed in parallel using Kubernetes Jobs.
- celery_k8s_job_executor : Ops/Assets are executed in parallel using Celery and Kubernetes Jobs.
Run Coordinator performs workflow scheduling and is configured in the Dagster Instance ([File 1]). The Run Coordinators supported by Dagster are as follows.
- DefaultRunCoordinator : When a workflow creation request comes in, it immediately calls the Run Launcher to create a Run. Used in Dagster Web Server and Dagster CLI.
- QueuedRunCoordinator : When a workflow creation request comes in, it stores the request in a queue and then retrieves it according to rules to create a Run. Used in Dagster Daemon. If configured to use QueuedRunCoordinator, Dagster Web Server does not directly process workflow creation requests but passes them to Dagster Daemon.
Dagster Daemon is not an essential Component for Dagster operation. If Dagster Daemon is not present, Schedule Objects, Sensor Objects, and QueuedRunCoordinator cannot be used, but workflow execution is not affected.
1.5. Compute Log
Compute Log performs the role of storing execution logs of Ops or Assets executed in Dagster. Compute Log is configured in the Dagster Instance ([File 1]). The Compute Logs supported by Dagster are as follows.
- LocalComputeLogManager : Stores Compute Log in the local filesystem.
- NoOpComputeLogManager : Does not store Compute Log.
- S3ComputeLogManager : Stores Compute Log on AWS S3.
- AzureComputeLogManager : Stores Compute Log on Azure Blob Storage.
- GCSComputeLogManager : Stores Compute Log on Google Cloud Storage.
2. References
- Dagster Architecture : https://docs.dagster.io/guides/deploy/oss-deployment-architecture
- Dagster Concepts : https://docs.dagster.io/getting-started/concepts
- Dagster Code Location : https://dagster.io/blog/dagster-code-locations
- Dagster Internals : https://docs.dagster.io/api/python-api/internals
- Dagster Run Launcher : https://docs.dagster.io/guides/deploy/execution/run-launchers
- Dagster Executor : https://docs.dagster.io/guides/operate/run-executors