Escribir operadores personalizados

Aunque RxPHP proporciona más de 40 operadores para que usemos, a veces, puede ser necesario usar un operador que no existe. Considere el siguiente caso:

<?php
require_once DIR . '/vendor/autoload.php';
use \Rx\Observer\CallbackObserver;
use \React\EventLoop\Factory;
use \Rx\Scheduler;
$loop = Factory::create();
Scheduler::setDefaultFactory(function () use ($loop) {
return new Scheduler\EventLoopScheduler($loop);
});
// correct
$users = serialize(['John', 'Mariya', 'Marc', 'Lucy']);
// faulty
// $users = str_replace('i:', '', serialize(['John', 'Mariya', 'Marc',
'Lucy']));
$observer = new CallbackObserver(
function ($value) {
echo 'Observer.$onNext: ', print_r($value, true), PHP_EOL;
},
function (\Throwable $t) {
echo 'Observer.$onError: ', $t->getMessage(), PHP_EOL;
},
function () {
echo 'Observer.$onCompleted', PHP_EOL;
}
);
Rx\Observable::just($users)
->map(function ($value) {
return unserialize($value);
})
->subscribe($observer);
$loop->run();

La ejecución de este código con la variable $users correcta nos da el siguiente resultado esperado:

Sin embargo, si tuviéramos que eliminar el comentario delante de la variable $user defectuosa, el resultado saldrá un poco inesperado, o al menos no cómo nos gustaría manejarlo:

Lo que realmente queremos es cambiar la lógica de deserialización al operador RxPHP y hacer que maneje con gracia los intentos fallidos de deserialización(). Afortunadamente, escribir un operador personalizado es una tarea fácil. Un vistazo rápido al archivo vendor/reactivex/rxphp/src/Operator/OperatorInterface.php revela la siguiente interfaz:

<?php
declare(strict_types=1);
namespace Rx\Operator;
use Rx\DisposableInterface;
use Rx\ObservableInterface;
use Rx\ObserverInterface;
interface OperatorInterface
{
public function __invoke(
ObservableInterface $observable,
ObserverInterface $observer
): DisposableInterface;
}

Es bastante fácil, la interfaz solo requiere una única implementación del método __invoke(). Este método se llama cuando intentamos llamar a un objeto como una función. OperatorInterface, en este caso, enumera tres argumentos para el método __invoke(), dos de los cuales son obligatorios:

  • $observable: esta será nuestra entrada observable a la que nos suscribiremos
  • $observador: aquí es donde emitiremos nuestro valor de salida

Con eso en mente, la siguiente es una implementación de nuestro custom UnserializeOperator:

<?php
use \Rx\DisposableInterface;
use \Rx\ObservableInterface;
use \Rx\ObserverInterface;
use \Rx\SchedulerInterface;
use \Rx\Observer\CallbackObserver;
use \Rx\Operator\OperatorInterface;
class UnserializeOperator implements OperatorInterface
{
/**
*@param \Rx\ObservableInterface $observable
*@param \Rx\ObserverInterface $observer
*@param \Rx\SchedulerInterface $scheduler
*@return \Rx\DisposableInterface
*/
public function __invoke(
ObservableInterface $observable,
ObserverInterface $observer,
SchedulerInterface $scheduler = null
): DisposableInterface
{
$callbackObserver = new CallbackObserver(
function ($value) use ($observer) {
if ($unsValue = unserialize($value)) {
$observer->onNext($unsValue);
} else {
$observer->onError(
new InvalidArgumentException('Faulty serialized
string.')
);
}
},
function ($error) use ($observer) {
$observer->onError($error);
},
function () use ($observer) {
$observer->onCompleted();
}
);
// ->subscribe(…) => DisposableInterface
return $observable->subscribe($callbackObserver, $scheduler);
}
}

Desafortunadamente, no podemos encadenar a nuestro operador directamente como encadenamos a los operadores RxPHP. Necesitamos ayudarnos con el operador lift():

Rx\Observable::just($users)
->lift(function () {
return new UnserializeOperator();
})
->subscribe($observer);

Con UnserializeOperator en su lugar, la cadena $users serializada defectuosa ahora da el siguiente resultado:

Nuestro operador ahora está manejando con éxito los errores, ya que los está delegando en la devolución de llamada del observador onError.

Aprovechar al máximo RxPHP se trata principalmente de conocer los entresijos de sus operadores. El directorio vendor/reactivex/rxphp/demo/ proporciona bastantes ejemplos de uso del operador. Vale la pena pasar un tiempo revisando cada uno.

Comparte