Is dagster a good fit for a data science pipeline?
Published
June 3, 2021
I’ve been looking at different pipeline frameworks for data science in production. The aim is to be able to write code freely in small chunks that are then composed into a pipeline. Doing this allows the pipeline to manage things like the performance of individual parts or tracking progress.
I had previously looked into celery for this but it was a poor fit. Celery works well for continuous services that are then submitting tasks to it. The ideal pipeline would work well in a batch setting as well as a continuous setting, and celery seems poor for batching.
Looking into the different frameworks available the two standouts appear to be dagster and metaflow. When I assess github projects I look at the stars, the last commit date, and the number of issues. If there are plenty of stars and issues and the last commit is recent then the project is in good shape. It’s good to look at a few projects to get an idea of what a large number is. To do that I found this awesome pipeline list.
Project Metrics
project
stars
issues open
issues closed
last commit
dagster
3.4k
581
2,359
7 hours ago
metaflow
4.4k
119
159
2 days ago
To justify these metrics slightly I will describe why I chose them. The stars show how popular a project is. A popular project will have more people using it and so will have more resources (blog posts, stack overflow etc) around using it well or fixing common issues.
The number of issues is another proxy for popularity - a popular project will get more issues, even if it is bug free. A sufficiently complex problem to need a framework will have the opportunity for confusion, and sometimes bugs are support requests. Closed issue count is a sign of project stewardship. Even if all of the issues are closed without fixing anything that at least means someone is paying attention to them.
The last commit is the most direct measure of project activity.
These metrics are not perfect and generally indicate the amount of work that might be expected to fit this to your task.
Task Suitability
What is needed for this framework is something that is
Easy to use
Easy to test
Easy to scale
The aim is to encourage my other team members to use this framework as a way to make their lives easier and to let the model work well in production. The very first bit of code shown for dagster seems to meet this:
What I’m hoping here is that the team can write their code in small units which are then appropriately annotated. The composition of them should then be reasonably straightforward.
Another positive point about dagster is that it works very well with pandas dataframes. I like pandas a lot as it does a great deal of work under the hood to make computations performant.
One thing that this code does not show is the amount of detail that is provided in the annotations when we get into the documentation. The documentation does cover things that I am interested in - testing and scaling. To determine how easy it is to use I guess I’ll just have to try it out.
Logging
Dagster does produce a lot of output at DEBUG level. I’ll run one example to show you this:
Code
from dagster import pipeline, solid, execute_pipelineimport requestsimport csv@soliddef hello_cereal(context): response = requests.get("https://docs.dagster.io/assets/cereal.csv") lines = response.text.split("\n") cereals = [row for row in csv.DictReader(lines)] context.log.info(f"Found {len(cereals)} cereals")return cereals@pipelinedef hello_cereal_pipeline(): hello_cereal()execute_pipeline(hello_cereal_pipeline)
<dagster.core.execution.results.PipelineExecutionResult at 0x7f7b81bdd820>
This can be quite useful when debugging, but it obscures the output that I want to see. I can adjust the logging level by using run_config when executing the pipeline:
2021-06-18 11:24:21 - dagster - INFO - system - b39f5876-96e8-4fa6-a810-cbb943224c4a - hello_cereal - Found 77 cereals
<dagster.core.execution.results.PipelineExecutionResult at 0x7f7b995164f0>
So this is the first step of the tutorial and it just shows the execution of a single solid. I find it odd that the logging message is not parameterized. The read of the csv is also a bit primitive considering pandas exists. These are small things though.
Code
import csvimport requestsfrom dagster import pipeline, solid, execute_pipeline@soliddef download_cereals(): response = requests.get("https://docs.dagster.io/assets/cereal.csv") lines = response.text.split("\n")return [row for row in csv.DictReader(lines)]@soliddef find_sugariest(context, cereals): sorted_by_sugar =sorted(cereals, key=lambda cereal: cereal["sugars"]) context.log.info(f'{sorted_by_sugar[-1]["name"]} is the sugariest cereal')@pipelinedef serial_pipeline(): find_sugariest(download_cereals())execute_pipeline(serial_pipeline, run_config=CONFIG)
2021-06-18 11:24:22 - dagster - INFO - system - dcfd3c10-daa7-4849-b500-441873f81326 - find_sugariest - Nut&Honey Crunch is the sugariest cereal
<dagster.core.execution.results.PipelineExecutionResult at 0x7f7b81abc3d0>
The pipeline determines that Nut&Honey Crunch is the sugariest cereal.
Now that it is actually doing some work I’m going to try converting this to using pandas.
2021-06-18 11:24:22 - dagster - INFO - system - 9beb23ad-f4be-47b7-a011-5e4efadcbada - find_sugariest - Smacks is the sugariest cereal
<dagster.core.execution.results.PipelineExecutionResult at 0x7f7b42d7ef40>
What is interesting here is that this returns a different result. The result from the original pipeline is Nut&Honey Crunch while the new pipeline has Smacks as top sugar. Let’s review them.
Code
import pandas as pddf = pd.read_csv("https://docs.dagster.io/assets/cereal.csv")df[df.name.isin({"Nut&Honey Crunch", "Smacks"})][["name", "sugars"]]
name
sugars
48
Nut&Honey Crunch
9
66
Smacks
15
So Smacks is the sugariest. I think this error was caused by the interpretation of the sugars column as a string by the plain csv reading code. When sorted lexographically 9 > 15.
2021-06-18 11:24:22 - dagster - INFO - system - 5f12a237-a9c8-434b-b422-8d9744399b4c - display_results - Most caloric cereal: Strawberry Fruit Wheats
2021-06-18 11:24:22 - dagster - INFO - system - 5f12a237-a9c8-434b-b422-8d9744399b4c - display_results - Most protein-rich cereal: Special K
<dagster.core.execution.results.PipelineExecutionResult at 0x7f7b42b03f10>
So here it is claiming that Special K is the most protein-rich cereal and that Strawberry Fruit Wheats is the most calorific. Let’s see if pandas agrees.
2021-06-18 11:24:22 - dagster - INFO - system - 61990ded-ce98-47e2-b548-a14682a18554 - display_results - Most caloric cereal: Mueslix Crispy Blend
2021-06-18 11:24:22 - dagster - INFO - system - 61990ded-ce98-47e2-b548-a14682a18554 - display_results - Most protein-rich cereal: Cheerios
<dagster.core.execution.results.PipelineExecutionResult at 0x7f7b42da47c0>
So once again pandas disagrees. The caloric content of Mueslix Crispy Blend is apparently higher, and Cheerios are the most protein rich. I used to eat Cheerios so this may be depressing.
Code
df[df.name.isin({"Special K", "Cheerios", "Strawberry Fruit Wheats", "Mueslix Crispy Blend"})]
name
mfr
type
calories
protein
fat
sodium
fiber
carbo
sugars
potass
vitamins
shelf
weight
cups
rating
11
Cheerios
G
C
110
6
2
290
2.0
17.0
1
105
25
1
1.0
1.25
50.764999
46
Mueslix Crispy Blend
K
C
160
3
2
150
3.0
17.0
13
160
25
3
1.5
0.67
30.313351
67
Special K
K
C
110
6
0
230
1.0
16.0
3
55
25
1
1.0
1.00
53.131324
68
Strawberry Fruit Wheats
N
C
90
2
0
15
3.0
15.0
5
90
25
2
1.0
1.00
59.363993
So this is a matter of sorting. In this case both approaches are correct as there isn’t a tie breaker.
Now we can test the result of the solid and pipelines. This will be critical to the success of this as I really want to be able to add tests to code that has been produced easily.
Code
from dagster import execute_solid, execute_pipelinedef test_hello_cereal_solid(): res = execute_solid(hello_cereal, run_config=CONFIG)assert res.successassertlen(res.output_value()) ==77def test_hello_cereal_pipeline(): res = execute_pipeline(hello_cereal_pipeline, run_config=CONFIG)assert res.successassertlen(res.result_for_solid("hello_cereal").output_value()) ==77test_hello_cereal_solid()test_hello_cereal_pipeline()
2021-06-18 11:24:22 - dagster - INFO - system - f41c9791-5044-4246-845c-73455b959cd5 - hello_cereal - Found 77 cereals
2021-06-18 11:24:22 - dagster - INFO - system - ff0c11a5-c007-496c-92ce-59f51dcf10ec - hello_cereal - Found 77 cereals
So the tutorial suggests that this is enough to get started with dagster. This is encouraging as at this point there is very little fancy configuration applied.
Incremental Usage
To be able to use this I need to be able to pass data in and get the response out. Being able to do this reliably would allow dagster to be gradually introduced into a larger application. Given that I will be converting existing code to use this it would be quite desirable to be able to do so gradually.
Code
def download_cereals():return pd.read_csv("https://docs.dagster.io/assets/cereal.csv")# how do you configure input for the pipeline?# this just exists to extract the input out of the context.@soliddef pipeline_input(context):return context.solid_config["df"]@soliddef find_highest_calorie_cereal(cereals):return ( cereals.sort_values(by="calories", ascending=False) .head(1) .name .item() )@soliddef find_highest_protein_cereal(cereals):return ( cereals.sort_values(by="protein", ascending=False) .head(1) .name .item() )@soliddef display_results(context, most_calories, most_protein): context.log.info(f"Most caloric cereal: {most_calories}") context.log.info(f"Most protein-rich cereal: {most_protein}")@pipelinedef complex_pipeline(): cereals = pipeline_input() display_results( most_calories=find_highest_calorie_cereal(cereals), most_protein=find_highest_protein_cereal(cereals), )def run(): df = download_cereals() output = execute_pipeline(complex_pipeline, run_config={"solids": {"pipeline_input": {"config": {"df": df}} },"loggers": {"console": {"config": {"log_level": "INFO"}} } })return {"most_calories": output.result_for_solid("find_highest_calorie_cereal").output_value(),"most_protein": output.result_for_solid("find_highest_protein_cereal").output_value(), }
Code
output = run()output
2021-06-18 11:46:02 - dagster - INFO - system - 51260d48-988e-4065-ad35-3338c444ff1b - display_results - Most caloric cereal: Mueslix Crispy Blend
2021-06-18 11:46:02 - dagster - INFO - system - 51260d48-988e-4065-ad35-3338c444ff1b - display_results - Most protein-rich cereal: Cheerios
If this is what is required to integrate dagster incrementally then it’s a no go. I’ve probably made this more complicated than it has to be however this is not at all what I am looking for.
The current problem is that pipelines cannot have return values. More broadly this simple “looks like a function, is not” pipeline does not appear to be possible within dagster. Reviewing the documentation it appears that the execute_pipeline method is also discouraged - instead you are expected to run pipelines from the command line or from dagit.