Dagster Evaluation

Is dagster a good fit for a data science pipeline?
Published

June 3, 2021

I’ve been looking at different pipeline frameworks for data science in production. The aim is to be able to write code freely in small chunks that are then composed into a pipeline. Doing this allows the pipeline to manage things like the performance of individual parts or tracking progress.

I had previously looked into celery for this but it was a poor fit. Celery works well for continuous services that are then submitting tasks to it. The ideal pipeline would work well in a batch setting as well as a continuous setting, and celery seems poor for batching.

Looking into the different frameworks available the two standouts appear to be dagster and metaflow. When I assess github projects I look at the stars, the last commit date, and the number of issues. If there are plenty of stars and issues and the last commit is recent then the project is in good shape. It’s good to look at a few projects to get an idea of what a large number is. To do that I found this awesome pipeline list.

Project Metrics

project stars issues open issues closed last commit
dagster 3.4k 581 2,359 7 hours ago
metaflow 4.4k 119 159 2 days ago

To justify these metrics slightly I will describe why I chose them. The stars show how popular a project is. A popular project will have more people using it and so will have more resources (blog posts, stack overflow etc) around using it well or fixing common issues.

The number of issues is another proxy for popularity - a popular project will get more issues, even if it is bug free. A sufficiently complex problem to need a framework will have the opportunity for confusion, and sometimes bugs are support requests. Closed issue count is a sign of project stewardship. Even if all of the issues are closed without fixing anything that at least means someone is paying attention to them.

The last commit is the most direct measure of project activity.

These metrics are not perfect and generally indicate the amount of work that might be expected to fit this to your task.

Task Suitability

What is needed for this framework is something that is

  • Easy to use
  • Easy to test
  • Easy to scale

The aim is to encourage my other team members to use this framework as a way to make their lives easier and to let the model work well in production. The very first bit of code shown for dagster seems to meet this:

from dagster import pipeline, solid


@solid
def get_name():
    return "dagster"


@solid
def hello(context, name):
    context.log.info(f"Hello, {name}!")


@pipeline
def hello_pipeline():
    hello(get_name())

What I’m hoping here is that the team can write their code in small units which are then appropriately annotated. The composition of them should then be reasonably straightforward.

Another positive point about dagster is that it works very well with pandas dataframes. I like pandas a lot as it does a great deal of work under the hood to make computations performant.

One thing that this code does not show is the amount of detail that is provided in the annotations when we get into the documentation. The documentation does cover things that I am interested in - testing and scaling. To determine how easy it is to use I guess I’ll just have to try it out.


Logging

Dagster does produce a lot of output at DEBUG level. I’ll run one example to show you this:

Code
from dagster import pipeline, solid, execute_pipeline
import requests
import csv

@solid
def hello_cereal(context):
    response = requests.get("https://docs.dagster.io/assets/cereal.csv")
    lines = response.text.split("\n")
    cereals = [row for row in csv.DictReader(lines)]
    context.log.info(f"Found {len(cereals)} cereals")

    return cereals

@pipeline
def hello_cereal_pipeline():
    hello_cereal()

execute_pipeline(hello_cereal_pipeline)
2021-06-18 11:24:21 - dagster - DEBUG - hello_cereal_pipeline - 65fda56e-063f-45b5-a5eb-ce789f84df22 - 216204 - PIPELINE_START - Started execution of pipeline "hello_cereal_pipeline".
2021-06-18 11:24:21 - dagster - DEBUG - hello_cereal_pipeline - 65fda56e-063f-45b5-a5eb-ce789f84df22 - 216204 - ENGINE_EVENT - Executing steps in process (pid: 216204)
2021-06-18 11:24:21 - dagster - DEBUG - hello_cereal_pipeline - 65fda56e-063f-45b5-a5eb-ce789f84df22 - 216204 - hello_cereal - ENGINE_EVENT - Starting initialization of resources [io_manager].
2021-06-18 11:24:21 - dagster - DEBUG - hello_cereal_pipeline - 65fda56e-063f-45b5-a5eb-ce789f84df22 - 216204 - hello_cereal - ENGINE_EVENT - Finished initialization of resources [io_manager].
2021-06-18 11:24:21 - dagster - DEBUG - hello_cereal_pipeline - 65fda56e-063f-45b5-a5eb-ce789f84df22 - 216204 - hello_cereal - LOGS_CAPTURED - Started capturing logs for solid: hello_cereal.
2021-06-18 11:24:21 - dagster - DEBUG - hello_cereal_pipeline - 65fda56e-063f-45b5-a5eb-ce789f84df22 - 216204 - hello_cereal - STEP_START - Started execution of step "hello_cereal".
2021-06-18 11:24:21 - dagster - INFO - system - 65fda56e-063f-45b5-a5eb-ce789f84df22 - hello_cereal - Found 77 cereals
2021-06-18 11:24:21 - dagster - DEBUG - hello_cereal_pipeline - 65fda56e-063f-45b5-a5eb-ce789f84df22 - 216204 - hello_cereal - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2021-06-18 11:24:21 - dagster - DEBUG - hello_cereal_pipeline - 65fda56e-063f-45b5-a5eb-ce789f84df22 - 216204 - hello_cereal - HANDLED_OUTPUT - Handled output "result" using IO manager "io_manager"
2021-06-18 11:24:21 - dagster - DEBUG - hello_cereal_pipeline - 65fda56e-063f-45b5-a5eb-ce789f84df22 - 216204 - hello_cereal - STEP_SUCCESS - Finished execution of step "hello_cereal" in 207ms.
2021-06-18 11:24:21 - dagster - DEBUG - hello_cereal_pipeline - 65fda56e-063f-45b5-a5eb-ce789f84df22 - 216204 - ENGINE_EVENT - Finished steps in process (pid: 216204) in 212ms
2021-06-18 11:24:21 - dagster - DEBUG - hello_cereal_pipeline - 65fda56e-063f-45b5-a5eb-ce789f84df22 - 216204 - PIPELINE_SUCCESS - Finished execution of pipeline "hello_cereal_pipeline".
<dagster.core.execution.results.PipelineExecutionResult at 0x7f7b81bdd820>

This can be quite useful when debugging, but it obscures the output that I want to see. I can adjust the logging level by using run_config when executing the pipeline:

Code
@solid
def hello_cereal(context):
    response = requests.get("https://docs.dagster.io/assets/cereal.csv")
    lines = response.text.split("\n")
    cereals = [row for row in csv.DictReader(lines)]
    context.log.info(f"Found {len(cereals)} cereals")

    return cereals

@pipeline
def hello_cereal_pipeline():
    hello_cereal()

execute_pipeline(
    hello_cereal_pipeline,
    run_config={
        "loggers": {
            "console": {"config": {"log_level": "INFO"}}
        }
    }
)
2021-06-18 11:24:21 - dagster - INFO - system - 9ba28087-f66e-4f20-850e-b2ce2aff38ce - hello_cereal - Found 77 cereals
<dagster.core.execution.results.PipelineExecutionResult at 0x7f7b99516d00>

I want to use this throughout this post so I am going to wrap the configuration up in a variable.

Code
CONFIG = {
    "loggers": {
        "console": {"config": {"log_level": "INFO"}}
    }
}

Tutorial

I’m going to go through the tutorial as a way to evaluate this.

Code
import requests
import csv
from dagster import pipeline, solid, execute_pipeline


@solid
def hello_cereal(context):
    response = requests.get("https://docs.dagster.io/assets/cereal.csv")
    lines = response.text.split("\n")
    cereals = [row for row in csv.DictReader(lines)]
    context.log.info(f"Found {len(cereals)} cereals")

    return cereals

@pipeline
def hello_cereal_pipeline():
    hello_cereal()

execute_pipeline(hello_cereal_pipeline, run_config=CONFIG)
2021-06-18 11:24:21 - dagster - INFO - system - b39f5876-96e8-4fa6-a810-cbb943224c4a - hello_cereal - Found 77 cereals
<dagster.core.execution.results.PipelineExecutionResult at 0x7f7b995164f0>

So this is the first step of the tutorial and it just shows the execution of a single solid. I find it odd that the logging message is not parameterized. The read of the csv is also a bit primitive considering pandas exists. These are small things though.

Code
import csv

import requests
from dagster import pipeline, solid, execute_pipeline


@solid
def download_cereals():
    response = requests.get("https://docs.dagster.io/assets/cereal.csv")
    lines = response.text.split("\n")
    return [row for row in csv.DictReader(lines)]


@solid
def find_sugariest(context, cereals):
    sorted_by_sugar = sorted(cereals, key=lambda cereal: cereal["sugars"])
    context.log.info(f'{sorted_by_sugar[-1]["name"]} is the sugariest cereal')


@pipeline
def serial_pipeline():
    find_sugariest(download_cereals())

execute_pipeline(serial_pipeline, run_config=CONFIG)
2021-06-18 11:24:22 - dagster - INFO - system - dcfd3c10-daa7-4849-b500-441873f81326 - find_sugariest - Nut&Honey Crunch is the sugariest cereal
<dagster.core.execution.results.PipelineExecutionResult at 0x7f7b81abc3d0>

The pipeline determines that Nut&Honey Crunch is the sugariest cereal.

Now that it is actually doing some work I’m going to try converting this to using pandas.

Code
import csv

import pandas as pd
import requests
from dagster import pipeline, solid, execute_pipeline


@solid
def download_cereals():
    return pd.read_csv("https://docs.dagster.io/assets/cereal.csv")


@solid
def find_sugariest(context, cereals):
    sorted_by_sugar = cereals.sort_values(by="sugars", ascending=False)
    sugariest_name = (
        sorted_by_sugar.head(1)
            .name
            .item()
    )
    context.log.info(f'{sugariest_name} is the sugariest cereal')


@pipeline
def serial_pipeline():
    find_sugariest(download_cereals())

execute_pipeline(serial_pipeline, run_config=CONFIG)
2021-06-18 11:24:22 - dagster - INFO - system - 9beb23ad-f4be-47b7-a011-5e4efadcbada - find_sugariest - Smacks is the sugariest cereal
<dagster.core.execution.results.PipelineExecutionResult at 0x7f7b42d7ef40>

What is interesting here is that this returns a different result. The result from the original pipeline is Nut&Honey Crunch while the new pipeline has Smacks as top sugar. Let’s review them.

Code
import pandas as pd

df = pd.read_csv("https://docs.dagster.io/assets/cereal.csv")
df[df.name.isin({"Nut&Honey Crunch", "Smacks"})][["name", "sugars"]]
name sugars
48 Nut&Honey Crunch 9
66 Smacks 15

So Smacks is the sugariest. I think this error was caused by the interpretation of the sugars column as a string by the plain csv reading code. When sorted lexographically 9 > 15.

The next task makes this into a dag.

Code
@solid
def download_cereals():
    response = requests.get("https://docs.dagster.io/assets/cereal.csv")
    lines = response.text.split("\n")
    return [row for row in csv.DictReader(lines)]


@solid
def find_highest_calorie_cereal(cereals):
    sorted_cereals = list(
        sorted(cereals, key=lambda cereal: cereal["calories"])
    )
    return sorted_cereals[-1]["name"]


@solid
def find_highest_protein_cereal(cereals):
    sorted_cereals = list(
        sorted(cereals, key=lambda cereal: cereal["protein"])
    )
    return sorted_cereals[-1]["name"]


@solid
def display_results(context, most_calories, most_protein):
    context.log.info(f"Most caloric cereal: {most_calories}")
    context.log.info(f"Most protein-rich cereal: {most_protein}")


@pipeline
def complex_pipeline():
    cereals = download_cereals()
    display_results(
        most_calories=find_highest_calorie_cereal(cereals),
        most_protein=find_highest_protein_cereal(cereals),
    )

execute_pipeline(complex_pipeline, run_config=CONFIG)
2021-06-18 11:24:22 - dagster - INFO - system - 5f12a237-a9c8-434b-b422-8d9744399b4c - display_results - Most caloric cereal: Strawberry Fruit Wheats
2021-06-18 11:24:22 - dagster - INFO - system - 5f12a237-a9c8-434b-b422-8d9744399b4c - display_results - Most protein-rich cereal: Special K
<dagster.core.execution.results.PipelineExecutionResult at 0x7f7b42b03f10>

So here it is claiming that Special K is the most protein-rich cereal and that Strawberry Fruit Wheats is the most calorific. Let’s see if pandas agrees.

Code
@solid
def download_cereals():
    return pd.read_csv("https://docs.dagster.io/assets/cereal.csv")


@solid
def find_highest_calorie_cereal(cereals):
    return (
        cereals.sort_values(by="calories", ascending=False)
            .head(1)
            .name
            .item()
    )


@solid
def find_highest_protein_cereal(cereals):
    return (
        cereals.sort_values(by="protein", ascending=False)
            .head(1)
            .name
            .item()
    )


@solid
def display_results(context, most_calories, most_protein):
    context.log.info(f"Most caloric cereal: {most_calories}")
    context.log.info(f"Most protein-rich cereal: {most_protein}")


@pipeline
def complex_pipeline():
    cereals = download_cereals()
    display_results(
        most_calories=find_highest_calorie_cereal(cereals),
        most_protein=find_highest_protein_cereal(cereals),
    )

execute_pipeline(complex_pipeline, run_config=CONFIG)
2021-06-18 11:24:22 - dagster - INFO - system - 61990ded-ce98-47e2-b548-a14682a18554 - display_results - Most caloric cereal: Mueslix Crispy Blend
2021-06-18 11:24:22 - dagster - INFO - system - 61990ded-ce98-47e2-b548-a14682a18554 - display_results - Most protein-rich cereal: Cheerios
<dagster.core.execution.results.PipelineExecutionResult at 0x7f7b42da47c0>

So once again pandas disagrees. The caloric content of Mueslix Crispy Blend is apparently higher, and Cheerios are the most protein rich. I used to eat Cheerios so this may be depressing.

Code
df[df.name.isin({"Special K", "Cheerios", "Strawberry Fruit Wheats", "Mueslix Crispy Blend"})]
name mfr type calories protein fat sodium fiber carbo sugars potass vitamins shelf weight cups rating
11 Cheerios G C 110 6 2 290 2.0 17.0 1 105 25 1 1.0 1.25 50.764999
46 Mueslix Crispy Blend K C 160 3 2 150 3.0 17.0 13 160 25 3 1.5 0.67 30.313351
67 Special K K C 110 6 0 230 1.0 16.0 3 55 25 1 1.0 1.00 53.131324
68 Strawberry Fruit Wheats N C 90 2 0 15 3.0 15.0 5 90 25 2 1.0 1.00 59.363993

So this is a matter of sorting. In this case both approaches are correct as there isn’t a tie breaker.

Now we can test the result of the solid and pipelines. This will be critical to the success of this as I really want to be able to add tests to code that has been produced easily.

Code
from dagster import execute_solid, execute_pipeline

def test_hello_cereal_solid():
    res = execute_solid(hello_cereal, run_config=CONFIG)
    assert res.success
    assert len(res.output_value()) == 77


def test_hello_cereal_pipeline():
    res = execute_pipeline(hello_cereal_pipeline, run_config=CONFIG)
    assert res.success
    assert len(res.result_for_solid("hello_cereal").output_value()) == 77

test_hello_cereal_solid()
test_hello_cereal_pipeline()
2021-06-18 11:24:22 - dagster - INFO - system - f41c9791-5044-4246-845c-73455b959cd5 - hello_cereal - Found 77 cereals
2021-06-18 11:24:22 - dagster - INFO - system - ff0c11a5-c007-496c-92ce-59f51dcf10ec - hello_cereal - Found 77 cereals

So the tutorial suggests that this is enough to get started with dagster. This is encouraging as at this point there is very little fancy configuration applied.


Incremental Usage

To be able to use this I need to be able to pass data in and get the response out. Being able to do this reliably would allow dagster to be gradually introduced into a larger application. Given that I will be converting existing code to use this it would be quite desirable to be able to do so gradually.

Code
def download_cereals():
    return pd.read_csv("https://docs.dagster.io/assets/cereal.csv")

# how do you configure input for the pipeline?
# this just exists to extract the input out of the context.
@solid
def pipeline_input(context):
    return context.solid_config["df"]

@solid
def find_highest_calorie_cereal(cereals):
    return (
        cereals.sort_values(by="calories", ascending=False)
            .head(1)
            .name
            .item()
    )


@solid
def find_highest_protein_cereal(cereals):
    return (
        cereals.sort_values(by="protein", ascending=False)
            .head(1)
            .name
            .item()
    )


@solid
def display_results(context, most_calories, most_protein):
    context.log.info(f"Most caloric cereal: {most_calories}")
    context.log.info(f"Most protein-rich cereal: {most_protein}")


@pipeline
def complex_pipeline():
    cereals = pipeline_input()
    display_results(
        most_calories=find_highest_calorie_cereal(cereals),
        most_protein=find_highest_protein_cereal(cereals),
    )

def run():
    df = download_cereals()

    output = execute_pipeline(complex_pipeline, run_config={
        "solids": {
            "pipeline_input": {"config": {"df": df}}
        },
        "loggers": {
            "console": {"config": {"log_level": "INFO"}}
        }
    })
    return {
        "most_calories": output.result_for_solid("find_highest_calorie_cereal").output_value(),
        "most_protein": output.result_for_solid("find_highest_protein_cereal").output_value(),
    }
Code
output = run()
output
2021-06-18 11:46:02 - dagster - INFO - system - 51260d48-988e-4065-ad35-3338c444ff1b - display_results - Most caloric cereal: Mueslix Crispy Blend
2021-06-18 11:46:02 - dagster - INFO - system - 51260d48-988e-4065-ad35-3338c444ff1b - display_results - Most protein-rich cereal: Cheerios
{'most_calories': 'Mueslix Crispy Blend', 'most_protein': 'Cheerios'}

If this is what is required to integrate dagster incrementally then it’s a no go. I’ve probably made this more complicated than it has to be however this is not at all what I am looking for.

What I wanted is something that looks like this:

def some_complex_pipeline(df):
    return {
        "most_calories": find_highest_calorie_cereal(df),
        "most_protein": find_highest_protein_cereal(df),
    }

invoke_pipeline(some_complex)

The current problem is that pipelines cannot have return values. More broadly this simple “looks like a function, is not” pipeline does not appear to be possible within dagster. Reviewing the documentation it appears that the execute_pipeline method is also discouraged - instead you are expected to run pipelines from the command line or from dagit.