Log in
Seblog.nl

#laravel

als antwoord op sebsel

Thanks @github.com/lootjs, I have seen Hub before, and I thought it did something else. I tried again, hoping that I missed something, but I don't think this is what I need here.

I think the idea of the Hub is that one can have multiple pipelines, with names, and that you can dispatch to those pipelines by name. In my example above, I have one pipeline and want to reuse that one pipeline. That does not match.

Example for how I think Hub works:

function prependFoo($thing, $next) {
    return $next('foo' . $thing);
}

function appendBar($thing, $next) {
    return $next($thing . 'bar');
}

$hub = new Illuminate\Pipeline\Hub();

$hub->defaults(function ($pipeline, $thing) {
    return $pipeline->send($thing)
        ->through(['prependFoo', 'appendBar'])
        ->then(fn($n) => $n);
});

$hub->pipeline('only-foo', function ($pipeline, $thing) {
    return $pipeline->send($thing)
        ->through(['prependFoo'])
        ->then(fn($n) => $n);
});

$hub->pipe('thing'); // "foothingbar"
$hub->pipe('thing', 'only-foo'); // "foothing"

Note that Hub does not it's Container to resolve Pipeline (just news it), and that it gives a new instance of Pipeline every time you call pipe() on it, which is quite the opposite of what I want.

Hub also seems to be a bit forgotten: it is not used by the framework itself, is not documented and does not have tests. It feels like an experiment for Middleware groups that went nowhere? (But that's speculating, probably @taylorotwell knows.)

But most of all: no, I don't think that does what I mean.

[laravel/ideas] Calling a Pipeline multiple times

So, currently the Illuminate\Pipeline\Pipeline is used for Route Middleware and Job Middleware. In both cases, the Pipeline is used once: you new up an Instance and pass it the Request or Job, then use the result. It is clearly optimized for that use case.

I have a use-case in which I would like to call a single Pipeline (consisting of the same pipes) over and over again. When I used the Pipeline as is, I was surprised to learn that each time I run the pipeline with ->then(), I would get new instances of my Middleware. An example:

$pipeline = resolve(Pipeline::class)
    ->through($pipes);

$handle = fopen('logs.txt', 'r');
while ($line = fgets($handle)) {
    $result = $pipeline
        ->send($line)
        ->then(fn($line) => $line);
    // ...
}

This will create new instances of all $pipes for each line in the file. This is undesirable if you want to do something with the lines based on a state of the pipe. As a silly example:

class LogRowCounter
{
    protected $counter = 0;

    public function handle($line, $next)
    {
        $this->counter++;

        return $next($this->counter . ' - ' . $line);
    }
}

Apart from resolving all the $pipes, a lot of checks are also done while running the pipeline, meaning it happens in the while loop of my example. Some of those things could be done beforehand.

Reusable Pipeline

To make the Pipeline reusable, I would propose a new method called prepare(). (I was also thinking about compile(), but ‘prepare’ is used in some protected methods of Pipeline already.) It would extract running the array_reduce() from then() and return the resulting closure, and then() can call it.

    public function prepare(Closure $destination)
    {
        $pipeline = array_reduce(
            array_reverse($this->pipes()), $this->carry(), $this->prepareDestination($destination)
        );

        return $pipeline;
    }

    public function then(Closure $destination)
    {
        $pipeline = $this->prepare($destination);

        return $pipeline($this->passable);
    }

This would still resolve all the pipes on ‘runtime’ though, because of how the closures are structured. Here is a new version of carry(), which resolves the pipes outside of the returned closure. The outer closure returned is meant for array_reduce(), like the current version; didn't want to refactor that.

    protected function carry()
    {
        return function ($stack, $pipe) {
            if (is_string($pipe)) {
                [$name, $parameters] = $this->parsePipeString($pipe);

                $pipe = $this->getContainer()->make($name);
            } else {
                $parameters = [];
            }

            return function ($passable) use ($stack, $pipe, $parameters) {
                try {
                    if (is_callable($pipe)) {
                        return $pipe($passable, $stack);
                    }

                    $parameters = array_merge([$passable, $stack], $parameters);

                    $carry = method_exists($pipe, $this->method)
                                    ? $pipe->{$this->method}(...$parameters)
                                    : $pipe(...$parameters);

                    return $this->handleCarry($carry);
                } catch (Exception $e) {
                    return $this->handleException($passable, $e);
                } catch (Throwable $e) {
                    return $this->handleException($passable, new FatalThrowableError($e));
                }
            };
        };
    }

All Pipeline tests pass, and the following new test would pass too:

    public function testPipelinePrepared()
    {
        $pipeline = (new Pipeline(new Container))
                    ->through([
                        PipelineTestPipeOne::class,
                        PipelineTestStatefulPipe::class
                    ])
                    ->prepare(function ($piped) {
                        return $piped;
                    });

        $result = $pipeline('foo');

        $this->assertSame('foo', $result);
        $this->assertSame('foo', $_SERVER['__test.pipe.one']);
        $this->assertSame([], $_SERVER['__test.pipe.state']);

        $result = $pipeline('bar');

        $this->assertSame('bar', $result);
        $this->assertSame('bar', $_SERVER['__test.pipe.one']);
        $this->assertSame([0, 1], $_SERVER['__test.pipe.state']);

        unset($_SERVER['__test.pipe.one']);
        unset($_SERVER['__test.pipe.state']);
    }
}

class PipelineTestStatefulPipe
{
    protected $count = 0;

    public function handle($piped, $next)
    {
        $_SERVER['__test.pipe.state'][] = $this->count++;

        return $next($piped);
    }
}

Things to note

First off: I’ve taken the liberty to rename the ! is_object($pipe) check to is_string($pipe), because the very next thing we do is call $this->parsePipeString($pipe) with it.

Because of the way the closure is bound, the following code would work, and it seems like I can put it to use, but it looks wonky and weird too:

$filename = 'logs.txt';
$pipeline = resolve(Pipeline::class)
    ->through($pipes);

$run = $pipeline->prepare(fn($line) => $line);

// Call all `init($filename, $next)` methods
$pipeline->via('init');
$run($filename);

// Set method back to 'handle'
$pipeline->via('handle');

$handle = fopen($filename, 'r');
while ($line = fgets($handle)) {
    // Call all `handle($line, $next)` methods
    $result = $run($line);

    // ...
}

… that syntax makes me feel like Pipeline should store the Closure as a property somewhere, so prepare() returns $this, and then add an __invoke() or run() to Pipeline, so you can at least use the same variable instead of $pipeline and $run.

Alternative solutions

While we could do the above, I also have a different solution: singletons in the Container.

I originally rejected this (hence the work above) because I don’t want my stateful pipes to be global singletons. I want the pipes to belong to the Pipeline: if the Pipeline goes, so go the references to the pipes.

Think for example of the above file-handling example, but as a queued Job: I don’t want the state of the pipes from one run to be around when I process the next log-file.

But: this last point can also be resolved by creating a new Container instance per Job, and giving that to the Pipeline while constructing it. You can bind singletons into that separate container, which will belong to the pipeline.

Note that in that case – if we don’t change the Pipeline class – you’re still making the calls to the Container on ‘runtime’, which is not optimal. On the other hand, it might not be a very big deal.

I don't really like the way I have to new-up or resolve the Pipeline in this case, but it is doable.


I’d be happy to work this into a PR, but I thought this might be a better start, especially since I’m still debating whether or not I got a point here.

Thanks for reading, I’m curious about your thoughts.