Torch Multiprocessing Pipeline

Handling pipe/spread/collect patterns across process boundaries in pytorch
Published

June 25, 2021

I have recently evaluated dagster to see if it could work out as a data science pipeline. This evaluation was triggered by the project that I am working on at $WORK.

The code is written in python, and it involves a data flow which seems to be hard to optimise with multiprocessing. This post is an investigation of how to represent this flow in python and optimise it. Ray is evaluated as a possible framework for this flow.


The Flow

This is the flow of the data. It’s pretty simple. I’ve tried to show the cardinality of the flow with the edge ends.

The edge:

  • |- - one input
  • -| - one output
  • >- - zero or more inputs
  • -< - zero or more outputs

“one” input or output might be a list or other collection, but it can be treated and passed around as a single thing.

So an arrow like:

  • |-| is a one to one mapping
  • |-< is a one to many mapping
  • >-| is a many to one mapping

The constraint that I can’t show well is that one input to the overall flow results in one output.

Now I want to make the GPU Inference stage operate at full capacity. The GPU is expensive so I want to maximize utilization.

The preprocessing steps can result in gaps in utilization, so I want to run those in a separate process so they can run while the GPU is working. Then the GPU can pick up new work as it becomes available. So the rough split of processes becomes:

The ideal flow gets a little more complex than this. The Filtering step removes entries that do not need inference. Removing these saves money by not wasting GPU time when the answer is already known.

This filtering step can end up with one path or the other being skipped entirely. If the entire input can be filtered then no inference is required, equally if no part of the input can be filtered then there is no skipping. When setting up communication between processes this can be quite a difficult thing to communicate.

Multiprocessing is complex as well - the number of processes for preprocessing or postprocessing is unbounded, but only a single process can access the GPU at any one time. This is because I don’t think the model can be shared between processes efficiently.

The final thing to consider is that this pipeline must not stall at all. This means that there cannot be a gap between the different entries that are passed through the pipeline. That means there could be several different things being processed at any one time.


The Investigation

After all this I can come up with some simulation code that will perform the required steps. The data has to indicate:

  • What batch it will be in
  • If it is filtered
  • Some value for GPU inference
  • Some way to validate that every value is present

It should be easy to come up with this:

Code
from dataclasses import dataclass

@dataclass
class Entry:
    unit_index: int
    index: int
    is_filtered: bool
    batch: int
    value: float

The next thing is to track the utilization of the GPU. It should be possible to do this by tracking the overall time taken for the pipeline and tracking the time executing the GPU function.

Code
from typing import *
from datetime import datetime

class Tracker:
    def __init__(self, function: Callable[..., Any]) -> None:
        self.function = function
        self.time = 0.

    def __call__(self, *args: Any, **kwargs: Any) -> Any:
        start = datetime.now()
        try:
            return self.function(*args, **kwargs)
        finally:
            end = datetime.now()
            self.time += (end - start).total_seconds()

    def reset(self) -> None:
        self.time = 0.
Code
@Tracker
def example_method(x, y, z):
    total = 0.
    for _ in range(z):
        total += x**y
    return total
Code
example_method(0.24, 1.33, 100_000_000)
14985835.178920489
Code
example_method.time
4.883602
Code
example_method.reset()
Code
example_method.time
0.0

So this seems pretty easy to use. The next thing is to come up with some example methods. These methods are just mocks - they will take some time to run, thanks to sleeping, and the “GPU” will make some changes to the values - but they do not involve real work. Mocking the methods like this allows me to concentrate on the problem of correct multiprocessing without also having to get the processing implementation correct.

For this “batch” is an overloaded term, so I will refer to what enters the data flow as a unit and then the GPU processes individual batches.

Code
#collapse

from time import sleep
from collections import defaultdict
from dataclasses import replace

def feature_extraction(unit: List[Entry]) -> List[Entry]:
    sleep(0.1)
    return unit

def filtering(unit: List[Entry]) -> Tuple[List[Entry], List[Entry]]:
    sleep(0.1)
    filtered = []
    unfiltered = []
    for entry in unit:
        if entry.is_filtered:
            filtered.append(entry)
        else:
            unfiltered.append(entry)
    return filtered, unfiltered

def batching(unit: List[Entry]) -> Iterator[List[Entry]]:
    batches = defaultdict(list)
    for entry in unit:
        batches[entry.batch].append(entry)
    for batch in batches.values():
        sleep(0.1)
        yield batch

@Tracker
def inference(batch: List[Entry]) -> List[Entry]:
    result = [
        replace(entry, value=-entry.value)
        for entry in batch
    ]
    sleep(1.0)
    return result

def postprocessing(filtered: List[Entry], processed: List[Entry]) -> List[Entry]:
    combined = filtered + processed
    sleep(0.1)
    return sorted(combined, key=lambda entry: entry.index)

To test these methods I want to be able to generate a unit of work, and to check that it has been correctly processed.

Code
from random import shuffle

def generate_unit(unit_index: int, batch_size: int, batches: int, filtered: int) -> List[Entry]:
    filtered_entries = [
        Entry(unit_index=unit_index, index=index, is_filtered=True, batch=0, value=index)
        for index in range(filtered)
    ]
    index_iter = iter(range(filtered, (batches*batch_size)+filtered))
    unfiltered_entries = [
        Entry(unit_index=unit_index, index=index, is_filtered=False, batch=batch, value=index)
        for batch in range(batches)
        for _, index in zip(range(batch_size), index_iter)
    ]
    unit = filtered_entries + unfiltered_entries
    shuffle(unit)
    return unit

def validate_unit(original: List[Entry], actual: List[Entry]) -> bool:
    original = sorted(original, key=lambda entry: entry.index)
    original = [
        replace(entry, value=entry.value if entry.is_filtered else -entry.value)
        for entry in original
    ]
    return original == actual

This is what a unit of work looks like:

Code
generate_unit(unit_index=0, batch_size=3, batches=2, filtered=3)
[Entry(unit_index=0, index=6, is_filtered=False, batch=1, value=6),
 Entry(unit_index=0, index=4, is_filtered=False, batch=0, value=4),
 Entry(unit_index=0, index=7, is_filtered=False, batch=1, value=7),
 Entry(unit_index=0, index=8, is_filtered=False, batch=1, value=8),
 Entry(unit_index=0, index=1, is_filtered=True, batch=0, value=1),
 Entry(unit_index=0, index=3, is_filtered=False, batch=0, value=3),
 Entry(unit_index=0, index=5, is_filtered=False, batch=0, value=5),
 Entry(unit_index=0, index=2, is_filtered=True, batch=0, value=2),
 Entry(unit_index=0, index=0, is_filtered=True, batch=0, value=0)]

So to start with lets have a sequential pipeline that does no multiprocessing. If this doesn’t work then all hope is lost!

Code
@Tracker
def sequential_pipeline(unit: List[Entry]) -> List[Entry]:
    features = feature_extraction(unit)
    filtered, unfiltered = filtering(features)
    processed = [
        entry
        for batch in batching(unfiltered)
        for entry in inference(batch)
    ]
    return postprocessing(filtered, processed)

def evaluate(unit: List[Entry], pipeline: Callable[[List[Entry]], List[Entry]]) -> Tuple[bool, float]:
    pipeline.reset()
    inference.reset()
    
    output = pipeline(unit)
    
    return validate_unit(unit, output), inference.time / pipeline.time

Now we can evaluate it. This returns a boolean indicating if the validation passes as well as the fraction of the time that was spent in the GPU. The overall aim of this work is to maximize the time spent in the GPU by performing all pre and post processing in parallel.

Code
evaluate(
    generate_unit(unit_index=0, batch_size=3, batches=2, filtered=3),
    sequential_pipeline,
)
(True, 0.7993328937579227)
Code
evaluate(
    generate_unit(unit_index=0, batch_size=10, batches=3, filtered=10),
    sequential_pipeline,
)
(True, 0.8328502787727458)
Code
evaluate(
    generate_unit(unit_index=0, batch_size=10, batches=3, filtered=30),
    sequential_pipeline,
)
(True, 0.8329693463429567)

So here the time breaks down as follows:

  • feature_extraction 0.1 seconds
  • filtering 0.1 seconds
  • batching 0.1 seconds per batch
  • inference 1.0 seconds per batch
  • postprocessing 0.1 seconds

So there is a flat 0.3 seconds + 1.1 seconds per batch. The “GPU” spends 1 second per batch so for 3 batches there is a total of 3.6 seconds spent and 3 seconds in GPU. That results in 83% of the time being spent in the GPU. In the first test there were 2 batches so the total time was 2.5 seconds with 2 seconds in GPU (80%). It does look like the time tracking works as the fractions return are very close to these theoretical ratios.

Given the strongly sequential nature of this performance improvements will only be experienced for multiple batches. So lets try that.


Really Over Complicating Things

So I need to do a bunch of tracking to get things working in parallel. I also need to maintain this code, so it needs to get out of my way as much as possible.

When trying to make a good pattern the first thing to do is to just make it badly. So I am going to maintain the serial pipeline but every connection between the different areas is now a queue. Just as a reminder here is the pipeline again:

So I’m going to need three queues:

  • Batching to GPU Inference
  • GPU Inference to Post Processing
  • Filtering to Post Processing

To make it easier to track the completion of the work I am going to return a future from the pipeline. This will be handy as the postprocessing section of the pipeline can maintain a map of unit_index to future to allow them to be completed appropriately.

After looking up futures in the python documentation it seems that the multiprocessing future is not intended to be created directly. I should instead use the asyncio future I guess?


Async - Does This Work?

So the first thing to do might be to just test this works at all.

The very first thing the asyncio Future says is “not thread safe”. So I would need a way to handle the future objects without breaking the thread / process bounary. I’m not sure I can do that - the submission of the task fundamentally involves putting something on a queue and to populate the future I need to read from a different queue. I can’t nicely do both in the same process/thread.

It feels like this is the wrong direction. Asyncio isn’t about multiprocessing but instead cooperative multitasking on a single thread. The GPU needs to run concurrently with the preprocessing and postprocessing. I don’t think this works.


The Ray Framework

So I’ve been thinking about this quite a bit and discussing it with others. The thing is that coming up with a queue based system that eventually completes futures is actually quite complex and it feels like I am going against the grain of the multiprocessing code.

I recently evaluated dagster to see if it would be a good fit for this kind of thing, and I thought that it was not. I’ve been looking into other frameworks and Ray looks like it might be suitable. The reason I like it is that it can split different tasks across different multiprocessing contexts and it has explicit GPU / pytorch support.

So let’s see how easy it is to recast this problem as a ray data flow. I’m going to see if I can hack something together after reading a small amount of the documentation.

Code
import ray
ray.init()

@ray.remote
def run_ray(unit: List[Entry]) -> List[Entry]:
    filtered, batched = ray_preprocessing.remote(unit)
    processed = [
        ray_inference.remote(batch)
        for batch in batched
    ]
    return ray_postprocessing.remote(filtered, processed)

@ray.remote
def ray_preprocessing(unit: List[Entry]) -> Tuple[List[Entry], Iterator[List[Entry]]]:
    features = feature_extraction(unit)
    filtered, unfiltered = filtering(features)
    batched = batching(unfiltered)

    return filtered, batched

@ray.remote(num_gpus=1)
def ray_inference(batch: List[Entry]) -> List[Entry]:
    return inference(batch)

@ray.remote
def ray_postprocessing(filtered: List[Entry], processed: Iterator[List[Entry]]) -> List[Entry]:
    flat_processed = [
        entry
        for batch in ray.get(processed)
        for entry in batch
    ]
    return postprocessing(filtered, flat_processed)
2021-06-28 11:05:14,950 INFO services.py:1272 -- View the Ray dashboard at http://127.0.0.1:8265
Code
input_unit = generate_unit(unit_index=0, batch_size=3, batches=2, filtered=3)
output_unit = ray.get(run_ray.remote(input_unit))
RayTaskError(TypeError): ray::run_ray() (pid=487092, ip=192.168.1.54)
  File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task
  File "/home/matthew/.cache/pypoetry/virtualenvs/blog-HrtMnrOS-py3.8/lib/python3.8/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
    return function(*args, **kwargs)
  File "<ipython-input-122-e32f07b923ac>", line 27, in run_ray
TypeError: cannot unpack non-iterable ray._raylet.ObjectRef object
(pid=487092) 2021-06-28 11:05:16,819    ERROR worker.py:78 -- Unhandled error (suppress with RAY_IGNORE_UNHANDLED_ERRORS=1): ray::ray_preprocessing() (pid=487102, ip=192.168.1.54)
(pid=487092)   File "python/ray/_raylet.pyx", line 530, in ray._raylet.execute_task
(pid=487092)   File "python/ray/_raylet.pyx", line 531, in ray._raylet.execute_task
(pid=487092)   File "python/ray/_raylet.pyx", line 1594, in ray._raylet.CoreWorker.store_task_outputs
(pid=487092)   File "/home/matthew/.cache/pypoetry/virtualenvs/blog-HrtMnrOS-py3.8/lib/python3.8/site-packages/ray/serialization.py", line 326, in serialize
(pid=487092)     return self._serialize_to_msgpack(value)
(pid=487092)   File "/home/matthew/.cache/pypoetry/virtualenvs/blog-HrtMnrOS-py3.8/lib/python3.8/site-packages/ray/serialization.py", line 306, in _serialize_to_msgpack
(pid=487092)     self._serialize_to_pickle5(metadata, python_objects)
(pid=487092)   File "/home/matthew/.cache/pypoetry/virtualenvs/blog-HrtMnrOS-py3.8/lib/python3.8/site-packages/ray/serialization.py", line 266, in _serialize_to_pickle5
(pid=487092)     raise e
(pid=487092)   File "/home/matthew/.cache/pypoetry/virtualenvs/blog-HrtMnrOS-py3.8/lib/python3.8/site-packages/ray/serialization.py", line 262, in _serialize_to_pickle5
(pid=487092)     inband = pickle.dumps(
(pid=487092)   File "/home/matthew/.cache/pypoetry/virtualenvs/blog-HrtMnrOS-py3.8/lib/python3.8/site-packages/ray/cloudpickle/cloudpickle_fast.py", line 73, in dumps
(pid=487092)     cp.dump(obj)
(pid=487092)   File "/home/matthew/.cache/pypoetry/virtualenvs/blog-HrtMnrOS-py3.8/lib/python3.8/site-packages/ray/cloudpickle/cloudpickle_fast.py", line 580, in dump
(pid=487092)     return Pickler.dump(self, obj)
(pid=487092) TypeError: cannot pickle 'generator' object

I think that ray is unhappy with the iterator of batches. It’s quite desirable to retain the progressive batching rather than vivifying the full set at once. So that then requires intertwining the different sections.

Code
@ray.remote
def run_ray(unit: List[Entry]) -> List[Entry]:
    # returns a single value! ray.get needed to unpack
    filtered, batched = ray.get(ray_preprocessing.remote(unit))

    processed = [
        ray_inference.remote(batch)
        for batch in batched
    ]
    
    # without this the ray.get on run_ray returns an object reference (i.e. a future in a future)
    return ray.get(ray_postprocessing.remote(filtered, processed))

@ray.remote
def ray_preprocessing(unit: List[Entry]) -> Tuple[List[Entry], Iterator[List[Entry]]]:
    features = feature_extraction(unit)
    filtered, unfiltered = filtering(features)
    batched = list(batching(unfiltered))

    return filtered, batched
Code
input_unit = generate_unit(unit_index=0, batch_size=3, batches=2, filtered=3)
output_unit = ray.get(run_ray.remote(input_unit))
output_unit
[Entry(unit_index=0, index=0, is_filtered=True, batch=0, value=0),
 Entry(unit_index=0, index=1, is_filtered=True, batch=0, value=1),
 Entry(unit_index=0, index=2, is_filtered=True, batch=0, value=2),
 Entry(unit_index=0, index=3, is_filtered=False, batch=0, value=-3),
 Entry(unit_index=0, index=4, is_filtered=False, batch=0, value=-4),
 Entry(unit_index=0, index=5, is_filtered=False, batch=0, value=-5),
 Entry(unit_index=0, index=6, is_filtered=False, batch=1, value=-6),
 Entry(unit_index=0, index=7, is_filtered=False, batch=1, value=-7),
 Entry(unit_index=0, index=8, is_filtered=False, batch=1, value=-8)]

Eyeballing these results it looks correct. This is quite a hopeful start as it can at least express the problem.

I think I need to make the pipeline more linear to avoid using get in it. This can be done if I push the filtering right to the point where I invoke the GPU, skipping inference if the batch does not need it.

Code
#collapse

@ray.remote
def run_ray(unit: List[Entry]) -> List[Entry]:
    features = ray_feature_extraction.remote(unit)
    batch_iter = ray.get(ray_batching.remote(features))
    batch_iter = batch_iter.for_each(filtered_inference)
    processed = ray_postprocessing.remote(batch_iter)
    return ray.get(processed)

@ray.remote
def ray_feature_extraction(unit: List[Entry]) -> List[Entry]:
    return feature_extraction(unit)

@ray.remote
def ray_batching(unit: List[Entry]) -> Iterator[Tuple[bool, List[Entry]]]:
    filtered, unfiltered = filtering(unit)
    filtered_iter = iter([(True, filtered)])
    # unfiltered_iter = ((False, batch) for batch in unfiltered)
    return ray.util.iter.from_iterators([
        filtered_iter,
        lambda: ((False, batch) for batch in batching(unfiltered)),
    ])

@ray.remote
def ray_filtered_inference(entry: Tuple[bool, List[Entry]]) -> List[Entry]:
    filtered, batch = entry
    if filtered:
        return batch
    return ray.get(ray_inference.remote(batch))

def filtered_inference(entry: Tuple[bool, List[Entry]]) -> List[Entry]:
    filtered, batch = entry
    if filtered:
        return batch
    return ray.get(ray_inference.remote(batch))

@ray.remote
def ray_postprocessing(processed: Iterator[List[List[Entry]]]) -> List[Entry]:
    flattened_processed = [
        entry
        for batch in processed.gather_async()
        for entry in batch
    ]
    return postprocessing_2(flattened_processed)

# naming things is hard, this just does the post processing without combining the lists
def postprocessing_2(combined: List[Entry]) -> List[Entry]:
    sleep(0.1)
    return sorted(combined, key=lambda entry: entry.index)
Code
input_unit = generate_unit(unit_index=0, batch_size=3, batches=2, filtered=3)
output_unit = ray.get(run_ray.remote(input_unit))
output_unit
[Entry(unit_index=0, index=0, is_filtered=True, batch=0, value=0),
 Entry(unit_index=0, index=1, is_filtered=True, batch=0, value=1),
 Entry(unit_index=0, index=2, is_filtered=True, batch=0, value=2),
 Entry(unit_index=0, index=3, is_filtered=False, batch=0, value=-3),
 Entry(unit_index=0, index=4, is_filtered=False, batch=0, value=-4),
 Entry(unit_index=0, index=5, is_filtered=False, batch=0, value=-5),
 Entry(unit_index=0, index=6, is_filtered=False, batch=1, value=-6),
 Entry(unit_index=0, index=7, is_filtered=False, batch=1, value=-7),
 Entry(unit_index=0, index=8, is_filtered=False, batch=1, value=-8)]
Code
@Tracker
def ray_pipeline(unit: List[Entry]) -> List[Entry]:
    return ray.get(run_ray.remote(unit))
Code
evaluate(
    generate_unit(unit_index=0, batch_size=10, batches=3, filtered=30),
    ray_pipeline,
)
(True, 0.0)

Inference is called but the Tracker doesn’t track it I guess? I think this is because of the separate processes. I’ll have to just time the runs.

Code
input_unit = generate_unit(unit_index=0, batch_size=10, batches=3, filtered=30)
Code
%%timeit

evaluate(input_unit, sequential_pipeline)
3.61 s ± 795 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)
Code
%%timeit

evaluate(input_unit, ray_pipeline)
4.02 s ± 5.08 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

So the use of ray comes at quite a sequential loss. It may be that the preprocessing steps are too small and simple to benefit much from it, as there is quite a bit of fan out that is being done. I wonder if the overhead can be reduced by just making the inference and overall pipeline the two remotes.

Before doing that it would be good to see if this approach actually results in greater performance for parallel executions.

Code
def evaluate_multiple(
    unit: List[Entry],
    pipeline: Callable[[List[Entry]], Any],
    collector: Callable[[List[Any]], List[List[Entry]]],
    count: int
) -> bool:
    intermediate = [
        pipeline(unit)
        for _ in range(count)
    ]
    final = collector(intermediate)
    
    return all(validate_unit(unit, output) for output in final)
Code
%%timeit

evaluate_multiple(
    unit=input_unit,
    pipeline=sequential_pipeline,
    collector=lambda x: x,
    count=7,
)
25.2 s ± 2.89 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Code
%%timeit

evaluate_multiple(
    unit=input_unit,
    pipeline=run_ray.remote,
    collector=ray.get,
    count=7,
)
2021-06-28 13:31:34,950 WARNING worker.py:1114 -- WARNING: 61 PYTHON workers have been started on a node of the id: bbc0cc928807fc34feebcaaf8a3f5bc198d38fac5711aa8fc3fe9bdc and address: 192.168.1.54. This could be a result of using a large number of actors, or it could be a consequence of using nested tasks (see https://github.com/ray-project/ray/issues/3644) for some a discussion of workarounds.
2021-06-28 13:32:19,477 WARNING worker.py:1114 -- WARNING: 84 PYTHON workers have been started on a node of the id: bbc0cc928807fc34feebcaaf8a3f5bc198d38fac5711aa8fc3fe9bdc and address: 192.168.1.54. This could be a result of using a large number of actors, or it could be a consequence of using nested tasks (see https://github.com/ray-project/ray/issues/3644) for some a discussion of workarounds.
2021-06-28 13:33:04,132 WARNING worker.py:1114 -- WARNING: 113 PYTHON workers have been started on a node of the id: bbc0cc928807fc34feebcaaf8a3f5bc198d38fac5711aa8fc3fe9bdc and address: 192.168.1.54. This could be a result of using a large number of actors, or it could be a consequence of using nested tasks (see https://github.com/ray-project/ray/issues/3644) for some a discussion of workarounds.
2021-06-28 13:33:26,465 WARNING worker.py:1114 -- WARNING: 128 PYTHON workers have been started on a node of the id: bbc0cc928807fc34feebcaaf8a3f5bc198d38fac5711aa8fc3fe9bdc and address: 192.168.1.54. This could be a result of using a large number of actors, or it could be a consequence of using nested tasks (see https://github.com/ray-project/ray/issues/3644) for some a discussion of workarounds.
22.3 s ± 147 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

So my crappy use of ray already produces some savings (and a lot of warnings). After reviewing the linked issue it seems that calling ray.get in the remote tasks can be a cause, so I should aggressively try to reduce that.

It would be good to understand the degree of saving that this multi processing has achieved. When I was evaluating the run before how much of the time was spent on the GPU?

Code
evaluate(unit=input_unit, pipeline=sequential_pipeline)
(True, 0.8329377757450576)

The GPU was being used 83% of the time. If the sequential pipeline evaluation maintains this then \(25.2 * 0.83 \approx 20.9\) seconds is spent using the GPU. The ideal multiprocessing system cannot take less time than this and one pass through the preprocessing and post processing. Reminding ourselves of the sequential pipeline:

  • feature_extraction 0.1 seconds
  • filtering 0.1 seconds
  • batching 0.1 seconds per batch
  • inference 1.0 seconds per batch
  • postprocessing 0.1 seconds

We can see that for 2 batches we have \(\frac{1}{2}\) a second of pre and post processing time. So our ideal multiprocessing system cannot take less than 21.4 seconds. This means that the observed time of 22.3 seconds has around a 5% overhead for the ray framework.


Optimizing Ray

To optimize the use of ray I want to remove as many vivifications in the pipeline as possible. Ideally only the postprocess would vivify the results, as it needs to sort them. So if the code is just copied and every get is dropped what happens?

Code
#collapse

@ray.remote
def run_ray(unit: List[Entry]) -> List[Entry]:
    features = ray_feature_extraction.remote(unit)
    batch_iter = ray_batching.remote(features)
    # batch_iter = batch_iter.for_each(filtered_inference)
    batch_iter = ray_filtered_inference.remote(batch_iter)
    processed = ray_postprocessing.remote(batch_iter)
    return processed

@ray.remote
def ray_feature_extraction(unit: List[Entry]) -> List[Entry]:
    return feature_extraction(unit)

@ray.remote
def ray_batching(unit: List[Entry]) -> Iterator[Tuple[bool, List[Entry]]]:
    filtered, unfiltered = filtering(unit)
    filtered_iter = iter([(True, filtered)])
    # unfiltered_iter = ((False, batch) for batch in unfiltered)
    return ray.util.iter.from_iterators([
        filtered_iter,
        lambda: ((False, batch) for batch in batching(unfiltered)),
    ])

@ray.remote
def ray_filtered_inference(batches):
    print(batches)
    return batches.for_each(filtered_inference)

def filtered_inference(entry: Tuple[bool, List[Entry]]) -> List[Entry]:
    filtered, batch = entry
    if filtered:
        return batch
    return ray_inference.remote(batch)

@ray.remote
def ray_postprocessing(processed: Iterator[List[List[Entry]]]) -> List[Entry]:
    flattened_processed = [
        entry
        for batch in processed.gather_async()
        for entry in batch
    ]
    return postprocessing_2(flattened_processed)

def postprocessing_2(combined: List[Entry]) -> List[Entry]:
    sleep(0.1)
    return sorted(combined, key=lambda entry: entry.index)
Code
output_unit = ray.get(run_ray.remote(input_unit))
(pid=500989) ParallelIterator[from_iterators[shards=2]]
2021-06-28 16:13:49,836 ERROR worker.py:78 -- Unhandled error (suppress with RAY_IGNORE_UNHANDLED_ERRORS=1): ray::ray_postprocessing() (pid=500989, ip=192.168.1.54)
  File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task
  File "/home/matthew/.cache/pypoetry/virtualenvs/blog-HrtMnrOS-py3.8/lib/python3.8/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
    return function(*args, **kwargs)
  File "<ipython-input-199-3481eb10f417>", line 36, in ray_postprocessing
  File "<ipython-input-199-3481eb10f417>", line 39, in <listcomp>
TypeError: 'ray._raylet.ObjectRef' object is not iterable
2021-06-28 16:23:01,492 WARNING worker.py:1114 -- A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. Task ID: ffffffffffffffff614587af9b6d87a0da65a29201000000 Worker ID: 1d7c107e8996c10ec3e0864ad3be9bf25ed086cfc00488a73c4159b2 Node ID: b88e68eafcd151095b489fdef057e7c8e02e9f1d66200c491b7b58b6 Worker IP address: 192.168.1.54 Worker port: 44123 Worker PID: 500992
2021-06-28 16:23:01,493 WARNING worker.py:1114 -- A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. Task ID: ffffffffffffffff608ba955ea7a2b15086d27a601000000 Worker ID: 1fe00124f2331e6f44637a9f2f672cb95a1dc88f0b68ec3fcb7316a9 Node ID: b88e68eafcd151095b489fdef057e7c8e02e9f1d66200c491b7b58b6 Worker IP address: 192.168.1.54 Worker port: 45683 Worker PID: 500991

The problem here is that the object ref is not resolved in the method that calls remote. That then means that it’s not possible to iterate over an object ref that contains a list.

Separately, it turns out that the @ray.remote decorator can take a num_returns argument so I might be able to restore the sidestep on the filtering. That would be nice as it would better represent the data flow and it would avoid some repeated computation. I have to push the invocation of the GPU into the filtering code to avoid dealing with the object ref.

Code
@ray.remote
def run_ray(unit: List[Entry]) -> List[Entry]:
    features = ray_feature_extraction.remote(unit)
    filtered, unfiltered = ray_filtering.remote(features)
    # unfiltered = unfiltered.for_each(ray_inference.remote)
    # is an object ref at this point
    processed = ray_postprocessing.remote(filtered, unfiltered)
    return ray.get(processed) # needed to unwrap one layer of remoteness

@ray.remote
def ray_feature_extraction(unit: List[Entry]) -> List[Entry]:
    return feature_extraction(unit)

@ray.remote(num_returns=2)
def ray_filtering(unit: List[Entry]) -> Tuple[List[Entry], List[Entry]]:
    filtered, unfiltered = filtering(unit)
    unfiltered = ray.util.iter.from_iterators([
        lambda: batching(unfiltered)
    ]).for_each(ray_inference.remote)
    return filtered, unfiltered

@ray.remote(num_gpus=1)
def ray_inference(batch: List[Entry]) -> List[Entry]:
    return inference(batch)

@ray.remote
def ray_postprocessing(filtered: List[Entry], processed: Iterator[List[List[Entry]]]) -> List[Entry]:
    # processed is a parallel iterator of object refs, this turns it into a List[List[Entry]]
    vivified_processed = ray.get(list(processed.gather_async()))
    flattened_processed = [
        entry
        for batch in vivified_processed
        for entry in batch
    ]
    return postprocessing(filtered, flattened_processed)
Code
output_unit = ray.get(run_ray.remote(input_unit))
Code
evaluate_multiple(
    unit=input_unit,
    pipeline=run_ray.remote,
    collector=ray.get,
    count=1,
)
True
Code
%%time

evaluate_multiple(
    unit=input_unit,
    pipeline=run_ray.remote,
    collector=ray.get,
    count=7,
)
CPU times: user 323 ms, sys: 97.2 ms, total: 421 ms
Wall time: 21.8 s
True

Remembering that this can’t beat 21.4s this has a nice improvement. It’s now around a 2% loss due to the ray overhead.

The downside is the pipeline crashes if I use %%timeit!

Code
%%timeit

evaluate_multiple(
    unit=input_unit,
    pipeline=run_ray.remote,
    collector=ray.get,
    count=7,
)
RayTaskError: ray::run_ray() (pid=512060, ip=192.168.1.54)
  File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task
  File "/home/matthew/.cache/pypoetry/virtualenvs/blog-HrtMnrOS-py3.8/lib/python3.8/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
    return function(*args, **kwargs)
  File "<ipython-input-244-ac08555772b5>", line 8, in run_ray
  File "/home/matthew/.cache/pypoetry/virtualenvs/blog-HrtMnrOS-py3.8/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 62, in wrapper
    return func(*args, **kwargs)
ray.exceptions.RayTaskError: ray::ray_postprocessing() (pid=513413, ip=192.168.1.54)
  File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task
  File "/home/matthew/.cache/pypoetry/virtualenvs/blog-HrtMnrOS-py3.8/lib/python3.8/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
    return function(*args, **kwargs)
  File "<ipython-input-244-ac08555772b5>", line 37, in ray_postprocessing
  File "/home/matthew/.cache/pypoetry/virtualenvs/blog-HrtMnrOS-py3.8/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 62, in wrapper
    return func(*args, **kwargs)
ray.exceptions.ObjectLostError: Object 1132cec14d581f81ffffffffffffffffffffffff0100000001000000 is lost due to node failure.
(pid=513413) 2021-06-29 07:38:53,737    WARNING worker.py:1492 -- Local object store memory usage:
(pid=513413) num clients with quota: 0
(pid=513413) quota map size: 0
(pid=513413) pinned quota map size: 0
(pid=513413) allocated bytes: 27675
(pid=513413) allocation limit: 17420428492
(pid=513413) pinned bytes: 15375
(pid=513413) (global lru) capacity: 17420428492
(pid=513413) (global lru) used: 7.06068e-05%
(pid=513413) (global lru) num objects: 4
(pid=513413) (global lru) num evictions: 0
(pid=513413) (global lru) bytes evicted: 0
(pid=513413) 
(pid=513417) 2021-06-29 07:38:53,738    WARNING worker.py:1492 -- Local object store memory usage:
(pid=513417) num clients with quota: 0
(pid=513417) quota map size: 0
(pid=513417) pinned quota map size: 0
(pid=513417) allocated bytes: 27675
(pid=513417) allocation limit: 17420428492
(pid=513417) pinned bytes: 12300
(pid=513417) (global lru) capacity: 17420428492
(pid=513417) (global lru) used: 8.82584e-05%
(pid=513417) (global lru) num objects: 5
(pid=513417) (global lru) num evictions: 0
(pid=513417) (global lru) bytes evicted: 0
(pid=513417) 
2021-06-29 07:38:59,137 ERROR worker.py:78 -- Unhandled error (suppress with RAY_IGNORE_UNHANDLED_ERRORS=1): ray::run_ray() (pid=512736, ip=192.168.1.54)
  File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task
  File "/home/matthew/.cache/pypoetry/virtualenvs/blog-HrtMnrOS-py3.8/lib/python3.8/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
    return function(*args, **kwargs)
  File "<ipython-input-244-ac08555772b5>", line 8, in run_ray
  File "/home/matthew/.cache/pypoetry/virtualenvs/blog-HrtMnrOS-py3.8/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 62, in wrapper
    return func(*args, **kwargs)
ray.exceptions.RayTaskError: ray::ray_postprocessing() (pid=513417, ip=192.168.1.54)
  File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task
  File "/home/matthew/.cache/pypoetry/virtualenvs/blog-HrtMnrOS-py3.8/lib/python3.8/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
    return function(*args, **kwargs)
  File "<ipython-input-244-ac08555772b5>", line 37, in ray_postprocessing
  File "/home/matthew/.cache/pypoetry/virtualenvs/blog-HrtMnrOS-py3.8/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 62, in wrapper
    return func(*args, **kwargs)
ray.exceptions.ObjectLostError: Object bbe8890ce390b7fcffffffffffffffffffffffff0100000001000000 is lost due to node failure.

It’s reproducible so there must be something I am doing wrong.

Code
def track_failures(function):
    def wrapper(*args, **kwargs):
        try:
            return function(*args, **kwargs)
        except:
            print(f"{function.__name__} failed")
            raise
    return wrapper

@ray.remote
@track_failures
def run_ray(unit: List[Entry]) -> List[Entry]:
    features = ray_feature_extraction.remote(unit)
    filtered, unfiltered = ray_filtering.remote(features)
    # unfiltered = unfiltered.for_each(ray_inference.remote)
    # is an object ref at this point
    processed = ray_postprocessing.remote(filtered, unfiltered)
    return ray.get(processed) # needed to unwrap one layer of remoteness

@ray.remote
@track_failures
def ray_feature_extraction(unit: List[Entry]) -> List[Entry]:
    return feature_extraction(unit)

@ray.remote(num_returns=2)
@track_failures
def ray_filtering(unit: List[Entry]) -> Tuple[List[Entry], List[Entry]]:
    filtered, unfiltered = filtering(unit)
    unfiltered = ray.util.iter.from_iterators([
        lambda: batching(unfiltered)
    ]).for_each(ray_inference.remote)
    return filtered, unfiltered

@ray.remote(num_gpus=1)
@track_failures
def ray_inference(batch: List[Entry]) -> List[Entry]:
    return inference(batch)

@ray.remote
@track_failures
def ray_postprocessing(filtered: List[Entry], processed: Iterator[List[List[Entry]]]) -> List[Entry]:
    # processed is a parallel iterator of object refs, this turns it into a List[List[Entry]]
    # vivified_processed = ray.get(list(processed.gather_async()))
    # flattened_processed = [
    #     entry
    #     for batch in vivified_processed
    #     for entry in batch
    # ]
    flattened_processed = [
        entry
        for batch in processed.gather_async()
        for entry in ray.get(batch)
    ]
    return postprocessing(filtered, flattened_processed)
Code
for _ in range(7):
    evaluate_multiple(
        unit=input_unit,
        pipeline=run_ray.remote,
        collector=ray.get,
        count=7,
    )
2021-06-29 09:35:54,722 WARNING import_thread.py:123 -- The remote function '__main__.wrapper' has been exported 100 times. It's possible that this warning is accidental, but this may indicate that the same remote function is being defined repeatedly from within many tasks and exported to all of the workers. This can be a performance issue and can be resolved by defining the remote function on the driver instead. See https://github.com/ray-project/ray/issues/6240 for more discussion.
(pid=521938) run_ray failed
(pid=523367) run_ray failed
(pid=524009) ray_postprocessing failed
(pid=524006) ray_inference failed
(pid=524015) ray_postprocessing failed
(pid=524014) ray_postprocessing failed
(pid=523373) run_ray failed
(pid=524009) 2021-06-29 09:37:41,158    WARNING worker.py:1492 -- Local object store memory usage:
(pid=524009) num clients with quota: 0
(pid=524009) quota map size: 0
(pid=524009) pinned quota map size: 0
(pid=524009) allocated bytes: 0
(pid=524009) allocation limit: 17420428492
(pid=524009) pinned bytes: 0
(pid=524009) (global lru) capacity: 17420428492
(pid=524009) (global lru) used: 0%
(pid=524009) (global lru) num objects: 0
(pid=524009) (global lru) num evictions: 0
(pid=524009) (global lru) bytes evicted: 0
(pid=524009) 
(pid=524006) 2021-06-29 09:37:41,156    ERROR worker.py:409 -- SystemExit was raised from the worker
(pid=524006) Traceback (most recent call last):
(pid=524006)   File "python/ray/_raylet.pyx", line 595, in ray._raylet.task_execution_handler
(pid=524006)   File "python/ray/_raylet.pyx", line 453, in ray._raylet.execute_task
(pid=524006)   File "python/ray/_raylet.pyx", line 490, in ray._raylet.execute_task
(pid=524006)   File "python/ray/_raylet.pyx", line 497, in ray._raylet.execute_task
(pid=524006)   File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task
(pid=524006)   File "/home/matthew/.cache/pypoetry/virtualenvs/blog-HrtMnrOS-py3.8/lib/python3.8/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
(pid=524006)     return function(*args, **kwargs)
(pid=524006)   File "<ipython-input-284-3a6e7992e7b3>", line 4, in wrapper
(pid=524006)   File "<ipython-input-284-3a6e7992e7b3>", line 37, in ray_inference
(pid=524006)   File "<ipython-input-53-a22bbad22ed2>", line 12, in __call__
(pid=524006)   File "<ipython-input-87-c8e13cb99c50>", line 34, in inference
(pid=524006)   File "/home/matthew/.cache/pypoetry/virtualenvs/blog-HrtMnrOS-py3.8/lib/python3.8/site-packages/ray/worker.py", line 406, in sigterm_handler
(pid=524006)     sys.exit(1)
(pid=524006) SystemExit: 1
(pid=524015) 2021-06-29 09:37:41,171    WARNING worker.py:1492 -- Local object store memory usage:
(pid=524015) num clients with quota: 0
(pid=524015) quota map size: 0
(pid=524015) pinned quota map size: 0
(pid=524015) allocated bytes: 0
(pid=524015) allocation limit: 17420428492
(pid=524015) pinned bytes: 0
(pid=524015) (global lru) capacity: 17420428492
(pid=524015) (global lru) used: 0%
(pid=524015) (global lru) num objects: 0
(pid=524015) (global lru) num evictions: 0
(pid=524015) (global lru) bytes evicted: 0
(pid=524015) 
(pid=524014) 2021-06-29 09:37:41,161    WARNING worker.py:1492 -- Local object store memory usage:
(pid=524014) num clients with quota: 0
(pid=524014) quota map size: 0
(pid=524014) pinned quota map size: 0
(pid=524014) allocated bytes: 0
(pid=524014) allocation limit: 17420428492
(pid=524014) pinned bytes: 0
(pid=524014) (global lru) capacity: 17420428492
(pid=524014) (global lru) used: 0%
(pid=524014) (global lru) num objects: 0
(pid=524014) (global lru) num evictions: 0
(pid=524014) (global lru) bytes evicted: 0
(pid=524014) 
RayTaskError: ray::wrapper() (pid=521938, ip=192.168.1.54)
  File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task
  File "/home/matthew/.cache/pypoetry/virtualenvs/blog-HrtMnrOS-py3.8/lib/python3.8/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
    return function(*args, **kwargs)
  File "<ipython-input-284-3a6e7992e7b3>", line 4, in wrapper
  File "<ipython-input-284-3a6e7992e7b3>", line 18, in run_ray
  File "/home/matthew/.cache/pypoetry/virtualenvs/blog-HrtMnrOS-py3.8/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 62, in wrapper
    return func(*args, **kwargs)
ray.exceptions.RayTaskError: ray::wrapper() (pid=524009, ip=192.168.1.54)
  File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task
  File "/home/matthew/.cache/pypoetry/virtualenvs/blog-HrtMnrOS-py3.8/lib/python3.8/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
    return function(*args, **kwargs)
  File "<ipython-input-284-3a6e7992e7b3>", line 4, in wrapper
  File "<ipython-input-284-3a6e7992e7b3>", line 49, in ray_postprocessing
  File "<ipython-input-284-3a6e7992e7b3>", line 52, in <listcomp>
  File "/home/matthew/.cache/pypoetry/virtualenvs/blog-HrtMnrOS-py3.8/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 62, in wrapper
    return func(*args, **kwargs)
ray.exceptions.ObjectLostError: Object 9e0ac0d915de549cffffffffffffffffffffffff0100000001000000 is lost due to node failure.
2021-06-29 09:37:41,263 WARNING worker.py:1114 -- A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. Task ID: 9e0ac0d915de549cffffffffffffffffffffffff01000000 Worker ID: 16c432c3d1b794fafb9a3f995bd52408672137af0324892e2b6f6109 Node ID: 21326df2c33d8228388387e2e8eae95d84664671749601d408e50621 Worker IP address: 192.168.1.54 Worker port: 40219 Worker PID: 524006
2021-06-29 09:37:46,786 ERROR worker.py:78 -- Unhandled error (suppress with RAY_IGNORE_UNHANDLED_ERRORS=1): ray::wrapper() (pid=523373, ip=192.168.1.54)
  File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task
  File "/home/matthew/.cache/pypoetry/virtualenvs/blog-HrtMnrOS-py3.8/lib/python3.8/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
    return function(*args, **kwargs)
  File "<ipython-input-284-3a6e7992e7b3>", line 4, in wrapper
  File "<ipython-input-284-3a6e7992e7b3>", line 18, in run_ray
  File "/home/matthew/.cache/pypoetry/virtualenvs/blog-HrtMnrOS-py3.8/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 62, in wrapper
    return func(*args, **kwargs)
ray.exceptions.RayTaskError: ray::wrapper() (pid=524015, ip=192.168.1.54)
  File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task
  File "/home/matthew/.cache/pypoetry/virtualenvs/blog-HrtMnrOS-py3.8/lib/python3.8/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
    return function(*args, **kwargs)
  File "<ipython-input-284-3a6e7992e7b3>", line 4, in wrapper
  File "<ipython-input-284-3a6e7992e7b3>", line 49, in ray_postprocessing
  File "<ipython-input-284-3a6e7992e7b3>", line 52, in <listcomp>
  File "/home/matthew/.cache/pypoetry/virtualenvs/blog-HrtMnrOS-py3.8/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 62, in wrapper
    return func(*args, **kwargs)
ray.exceptions.ObjectLostError: Object e9c2dc4e4da655d2ffffffffffffffffffffffff0100000001000000 is lost due to node failure.
2021-06-29 09:37:46,787 ERROR worker.py:78 -- Unhandled error (suppress with RAY_IGNORE_UNHANDLED_ERRORS=1): ray::wrapper() (pid=523367, ip=192.168.1.54)
  File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task
  File "/home/matthew/.cache/pypoetry/virtualenvs/blog-HrtMnrOS-py3.8/lib/python3.8/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
    return function(*args, **kwargs)
  File "<ipython-input-284-3a6e7992e7b3>", line 4, in wrapper
  File "<ipython-input-284-3a6e7992e7b3>", line 18, in run_ray
  File "/home/matthew/.cache/pypoetry/virtualenvs/blog-HrtMnrOS-py3.8/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 62, in wrapper
    return func(*args, **kwargs)
ray.exceptions.RayTaskError: ray::wrapper() (pid=524014, ip=192.168.1.54)
  File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task
  File "/home/matthew/.cache/pypoetry/virtualenvs/blog-HrtMnrOS-py3.8/lib/python3.8/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
    return function(*args, **kwargs)
  File "<ipython-input-284-3a6e7992e7b3>", line 4, in wrapper
  File "<ipython-input-284-3a6e7992e7b3>", line 49, in ray_postprocessing
  File "<ipython-input-284-3a6e7992e7b3>", line 52, in <listcomp>
  File "/home/matthew/.cache/pypoetry/virtualenvs/blog-HrtMnrOS-py3.8/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 62, in wrapper
    return func(*args, **kwargs)
ray.exceptions.ObjectLostError: Object 8c2fd1a33709485fffffffffffffffffffffffff0100000001000000 is lost due to node failure.

So the underlying problem is with the parallel iterator. I’m thinking that it might be better to refactor the code again anyway. I really just need the GPU code to be separated so the pipeline should have all of the code in it and just use ray once to invoke the gpu.

Code
@ray.remote
def run_ray(unit: List[Entry]) -> List[Entry]:
    features = feature_extraction(unit)
    filtered, unfiltered = filtering(features)
    if unfiltered:
        batches = batching(unfiltered)
        batch_remotes = [
            ray_inference.remote(batch)
            for batch in batches
        ]
        for idx in range(5):
            try:
                processed_batches = ray.get(batch_remotes)
                break
            except Exception as e:
                print("vivifying processed batches failed", e)
                if idx == 4:
                    raise e
        processed = [
            entry
            for batch in processed_batches
            for entry in batch
        ]
    else:
        processed = []

    return postprocessing(filtered, processed)

@ray.remote(num_gpus=1)
def ray_inference(batch: List[Entry]) -> List[Entry]:
    return inference(batch)
Code
evaluate_multiple(
    unit=input_unit,
    pipeline=run_ray.remote,
    collector=ray.get,
    count=1,
)
True
Code
%%time

evaluate_multiple(
    unit=input_unit,
    pipeline=run_ray.remote,
    collector=ray.get,
    count=7,
)
CPU times: user 360 ms, sys: 62.9 ms, total: 422 ms
Wall time: 21.5 s
True

So this did have a problem with StopIteration the first time I ran it. This time it has completed successfully. I’ve added a loop to retry the vivification.

Still, this time is incredible. I said that 21.4s was the theoretical maximum and this has achieved 21.5s.

Code
%%timeit

evaluate_multiple(
    unit=input_unit,
    pipeline=run_ray.remote,
    collector=ray.get,
    count=7,
)
21.5 s ± 1.59 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

This seems to be a reliable pipeline with a great time. That extra 0.1 s is about a 0.5% overhead for ray.

I’m very happy with this evaluation so far, the only downside is the problems with realising the remote values. Now I need to try implementing all this for real and see how it performs.