Operator

El modelo observable de RxPHP nos permite tratar flujos con operaciones simples y componibles. Cada una de estas operaciones es realizada por un operador individual. La composición de los operadores es posible porque los propios operadores en su mayoría devuelven observables como resultado de su operación.

Un vistazo rápido al proveedor \reactivex\rxphp\lib\Rx\Operator el directorio revela 48 implementaciones de operadores diferentes, clasificadas en varias categorías

  • Creando o
  • Transformando observables
  • Filtrado de observables
  • Combinando observables
  • Operadores de manejo de errores
  • Operadores de servicios públicos observables
  • Operadores condicionales y booleanos
  • Operadores matemáticos y agregados
  • Operadores observables conectables

Los métodos de map, filter y reduce son probablemente los operadores más conocidos y populares, así que comencemos nuestro ejemplo con ellos:

<?php
require_once DIR . '/vendor/autoload.php';
use \Rx\Observable;
use \Rx\Observer\CallbackObserver;
use \React\EventLoop\Factory;
use \Rx\Scheduler;
$loop = Factory::create();
Scheduler::setDefaultFactory(function () use ($loop) {
return new Scheduler\EventLoopScheduler($loop);
});
// Generator function
function xrange($start, $end, $step = 1)
{
for ($i = $start; $i <= $end; $i += $step) {
yield $i;
}
}
// Observer
$observer = new CallbackObserver(
function ($item) {
echo $item, PHP_EOL;
}
);
echo 'start', PHP_EOL;
// Observable stream, made from iterator/generator
Observable::fromIterator(xrange(1, 10, 1))
->map(function ($item) {
return $item * 2;
})
->filter(function ($item) {
return $item % 3 == 0;
})
->reduce(function ($x, $y) {
return $x + $y;
})
->subscribe($observer);
echo 'end', PHP_EOL;
$loop->run();

Comenzamos escribiendo una función de generador simple llamada xrange(). La belleza del generador aquí es que la función xrange() siempre tomará la misma cantidad de memoria, independientemente del rango que elijamos. Esto nos da una excelente base para jugar con los operadores ReactiveX. Luego creamos un simple $observador, utilizando solo su $onNext invocable mientras ignoramos los $onError y $onCompleted invocables para este propósito.
Luego creamos una secuencia observable a partir de nuestra función xrange(), pasándole un rango de 1 a 20. Finalmente, llegamos al punto en el que conectamos el método map(), filter(), reduce() y subscribe() llama a nuestra instancia observable.
Si tuviéramos que ejecutar este código ahora, el resultado sería el número 36. Para entender de dónde viene esto, demos un paso atrás y comentemos los métodos filter() y reduce():

Observable::fromIterator(xrange(1, 10, 1))
->map(function ($item) {
return $item * 2;
})
// ->filter(function ($item) {
// return $item % 3 == 0;
// })
// ->reduce(function ($x, $y) {
// return $x + $y;
// })
->subscribe($observer);

El resultado ahora es el siguiente:

start
2
4
6
8
10
12
14
16
18
20
end

La función map() transforma los elementos emitidos aplicando una función a cada elemento. En este caso, esa función es $ item*2. Ahora, avancemos y restauremos la función filter(), pero dejemos la función reduce() comentada:

Observable::fromIterator(xrange(1, 10, 1))
->map(function ($item) {
return $item * 2;
})
->filter(function ($item) {
return $item % 3 == 0;
})
// ->reduce(function ($x, $y) {
// return $x + $y;
// })
->subscribe($observer);

Knowing now that the filter() function will receive the map() function output stream (2,
4, 6, … 20), we observe the following output:

start
6
12
18
end

La función filter() transforma los elementos emitidos emitiendo solo aquellos elementos que pasan una prueba de predicado. En este caso, la prueba de predicado es $item% 3 == 0, lo que significa que devuelve ítems divisibles por 3.
Finalmente, si restauramos la función reduce(), el resultado regresa a 36. A diferencia de map() y filter(), que acepta un solo valor de elemento emitido, la devolución de llamada de función reduce() acepta dos valores.
Un cambio rápido en el cuerpo de la devolución de llamada reduce() aclara lo que está sucediendo:

->reduce(function ($x, $y) {
$z = $x + $y;
echo '$x: ', $x, PHP_EOL;
echo '$y: ', $y, PHP_EOL;
echo '$z: ', $z, PHP_EOL, PHP_EOL;
return $z;
})

Esto da salida de la siguiente manera:

start
$x: 6
$y: 12
$z: 18
$x: 18
$y: 18
$z: 36
36
end

Podemos ver que $x aparece como un valor del primer elemento emitido, mientras que $y aparece como un valor del segundo elemento emitido. La función luego aplica el cálculo de la suma en ellos, haciendo que el resultado devuelto sea ahora un primer elemento emitido en la segunda iteración, básicamente, dando
(6 + 12) => 18 => (18 + 18) => 36.

Dada la gran cantidad de operadores compatibles con RxPHP, podemos imaginar las complejidades de la vida real que podemos resolver de una manera elegante simplemente componiendo una serie de operadores en una cadena, de la siguiente manera:

$observable
->operator1(function () { /* …/ })
->operator2(function () { // })
->operator3(function () { // })
// …
->operatorN(function () { / …*/ })
->subscribe($observer);

Si los operadores existentes no son suficientes, podemos escribir fácilmente el nuestro extendiendo Rx\Operator\OperatorInterface.

Comparte