Log in
Seblog.nl

Seblog.nl

I’m in the train between Leiden and The Hague and for almost the full ten minutes of that trip, two trains ride next to each other at about the same speed. Very cinematic to see. (Although I admit this video has too much reflection to be truely cinematic.)

Ik, terwijl ik mijn veel te warme laptop na een nacht van de lader haal: ‘Wat heb jij zitten doen? Jij hebt naar mijn foto’s zitten kijken hè?’ Laptop zegt niets terug.

[laravel/ideas] Calling a Pipeline multiple times

So, currently the Illuminate\Pipeline\Pipeline is used for Route Middleware and Job Middleware. In both cases, the Pipeline is used once: you new up an Instance and pass it the Request or Job, then use the result. It is clearly optimized for that use case.

I have a use-case in which I would like to call a single Pipeline (consisting of the same pipes) over and over again. When I used the Pipeline as is, I was surprised to learn that each time I run the pipeline with ->then(), I would get new instances of my Middleware. An example:

$pipeline = resolve(Pipeline::class)
    ->through($pipes);

$handle = fopen('logs.txt', 'r');
while ($line = fgets($handle)) {
    $result = $pipeline
        ->send($line)
        ->then(fn($line) => $line);
    // ...
}

This will create new instances of all $pipes for each line in the file. This is undesirable if you want to do something with the lines based on a state of the pipe. As a silly example:

class LogRowCounter
{
    protected $counter = 0;

    public function handle($line, $next)
    {
        $this->counter++;

        return $next($this->counter . ' - ' . $line);
    }
}

Apart from resolving all the $pipes, a lot of checks are also done while running the pipeline, meaning it happens in the while loop of my example. Some of those things could be done beforehand.

Reusable Pipeline

To make the Pipeline reusable, I would propose a new method called prepare(). (I was also thinking about compile(), but ‘prepare’ is used in some protected methods of Pipeline already.) It would extract running the array_reduce() from then() and return the resulting closure, and then() can call it.

    public function prepare(Closure $destination)
    {
        $pipeline = array_reduce(
            array_reverse($this->pipes()), $this->carry(), $this->prepareDestination($destination)
        );

        return $pipeline;
    }

    public function then(Closure $destination)
    {
        $pipeline = $this->prepare($destination);

        return $pipeline($this->passable);
    }

This would still resolve all the pipes on ‘runtime’ though, because of how the closures are structured. Here is a new version of carry(), which resolves the pipes outside of the returned closure. The outer closure returned is meant for array_reduce(), like the current version; didn't want to refactor that.

    protected function carry()
    {
        return function ($stack, $pipe) {
            if (is_string($pipe)) {
                [$name, $parameters] = $this->parsePipeString($pipe);

                $pipe = $this->getContainer()->make($name);
            } else {
                $parameters = [];
            }

            return function ($passable) use ($stack, $pipe, $parameters) {
                try {
                    if (is_callable($pipe)) {
                        return $pipe($passable, $stack);
                    }

                    $parameters = array_merge([$passable, $stack], $parameters);

                    $carry = method_exists($pipe, $this->method)
                                    ? $pipe->{$this->method}(...$parameters)
                                    : $pipe(...$parameters);

                    return $this->handleCarry($carry);
                } catch (Exception $e) {
                    return $this->handleException($passable, $e);
                } catch (Throwable $e) {
                    return $this->handleException($passable, new FatalThrowableError($e));
                }
            };
        };
    }

All Pipeline tests pass, and the following new test would pass too:

    public function testPipelinePrepared()
    {
        $pipeline = (new Pipeline(new Container))
                    ->through([
                        PipelineTestPipeOne::class,
                        PipelineTestStatefulPipe::class
                    ])
                    ->prepare(function ($piped) {
                        return $piped;
                    });

        $result = $pipeline('foo');

        $this->assertSame('foo', $result);
        $this->assertSame('foo', $_SERVER['__test.pipe.one']);
        $this->assertSame([], $_SERVER['__test.pipe.state']);

        $result = $pipeline('bar');

        $this->assertSame('bar', $result);
        $this->assertSame('bar', $_SERVER['__test.pipe.one']);
        $this->assertSame([0, 1], $_SERVER['__test.pipe.state']);

        unset($_SERVER['__test.pipe.one']);
        unset($_SERVER['__test.pipe.state']);
    }
}

class PipelineTestStatefulPipe
{
    protected $count = 0;

    public function handle($piped, $next)
    {
        $_SERVER['__test.pipe.state'][] = $this->count++;

        return $next($piped);
    }
}

Things to note

First off: I’ve taken the liberty to rename the ! is_object($pipe) check to is_string($pipe), because the very next thing we do is call $this->parsePipeString($pipe) with it.

Because of the way the closure is bound, the following code would work, and it seems like I can put it to use, but it looks wonky and weird too:

$filename = 'logs.txt';
$pipeline = resolve(Pipeline::class)
    ->through($pipes);

$run = $pipeline->prepare(fn($line) => $line);

// Call all `init($filename, $next)` methods
$pipeline->via('init');
$run($filename);

// Set method back to 'handle'
$pipeline->via('handle');

$handle = fopen($filename, 'r');
while ($line = fgets($handle)) {
    // Call all `handle($line, $next)` methods
    $result = $run($line);

    // ...
}

… that syntax makes me feel like Pipeline should store the Closure as a property somewhere, so prepare() returns $this, and then add an __invoke() or run() to Pipeline, so you can at least use the same variable instead of $pipeline and $run.

Alternative solutions

While we could do the above, I also have a different solution: singletons in the Container.

I originally rejected this (hence the work above) because I don’t want my stateful pipes to be global singletons. I want the pipes to belong to the Pipeline: if the Pipeline goes, so go the references to the pipes.

Think for example of the above file-handling example, but as a queued Job: I don’t want the state of the pipes from one run to be around when I process the next log-file.

But: this last point can also be resolved by creating a new Container instance per Job, and giving that to the Pipeline while constructing it. You can bind singletons into that separate container, which will belong to the pipeline.

Note that in that case – if we don’t change the Pipeline class – you’re still making the calls to the Container on ‘runtime’, which is not optimal. On the other hand, it might not be a very big deal.

I don't really like the way I have to new-up or resolve the Pipeline in this case, but it is doable.


I’d be happy to work this into a PR, but I thought this might be a better start, especially since I’m still debating whether or not I got a point here.

Thanks for reading, I’m curious about your thoughts.

‪Ik heb één klok in mijn huis die niet zelf naar wintertijd gaat. Het was leuk om te zien hoe laat het ‘eigenlijk’ was, vond ik, voor een paar weken. Nu heb ik ‘m op lokale zonnetijd gezet, dat voelt echter.‬

Really liking the arrow functions in PHP 7.4 so far! Unfortunately already found one missing feature: you can't throw Exceptions from them. The following results in a 'unexpected T_THROW' syntax error:

fn() => throw new Exception('nope');

Would've cleaned up my test, but alas.

De oorkussens van mijn Bose QuietComfort 35 II waren na twee jaar dagelijks gebruik echt helemaal versleten en aan één kant zelfs opengesprongen. Jammer, want het is echt een geweldige koptelefoon. Maar gelukkig kon ik nieuwe oorkussens bestellen, en die zijn net binnen.

Nieuwe dingen kopen geeft altijd een korte kick, maar het zorgvuldig oppoetsen en het vervangen van kapotte onderdelen van een ding dat je dagelijks gebruikt voelt veel fijner. Hij is weer echt als nieuw. Op naar de volgende twee jaar.

Disney+’s algorithm still hasn’t figured out I’m binge watching all Marvel Cinematic Universe movies in chronological order. Luckily there is Wikipedia for the list.

Meer laden