style="display:inline-block;width:728px;height:90px"
data-ad-client="ca-pub-5164839828746352"
data-ad-slot="7563230308">

Usando el ListeningExecutorService de Guava

Antes de conociera algo de Hystrix en una aplicacion que mantengo tuvimos varios problemas con un servicio de notificaciones. Este servicio dependia de otro servidor que, como todo, a veces fallaba.

El detalle es que el proceso de notificación se hacía durante el proceso principal de la aplicacion; cuando se realizaban las llamadas correspondientes al servicio el proceso se detiene o lanzaba excepción... pero esto era en el mejor de los casos, también ocurrio que no especificamos timeouts para el servicio y por lo tanto el proceso principal de la aplicacion podia quedarse varado hasta por un día (que era el timeout por defecto del socket que se abría por medio de otra libreria).

En fin, el perder las notificaciones no era opción y tampoco lo era detener el proceso hasta que estas notificaciones se entregaran. Por lo tanto, optamos por hacer algo como esto:

Un servicio que a veces falla

Supongamos que tenemos un servicio; que de vez en cuando da problemas:

public interface FaultyService<Input, Output> {

    Output process(Input input);

}

Ahora, este servicio es totalmente síncrono y, para fines de este post, ¡No queremos que lo sea! Entonces, creamos una nueva interface.

Un servicio que a veces falla, pero asincrono... o eso parece

public interface AsyncFaultyService<Input, Output> {

    Future<Output> process(Input input);

}

Y pues para poder usar un servicio no sincrono, usamos el patron de diseño adapter para realizar algo similar a esto:

public class AsyncFaultyServiceAdapter<Input, Output>
        implements AsyncFaultyService<Input, Output> {

    private final FaultyService<Input, Output> faultyService;
    private final ExecutorService executorService;

    public AsyncFaultyServiceAdapter(FaultyService<Input, Output> faultyService,
                                     ExecutorService executorService) {
        this.faultyService = faultyService;
        this.executorService = executorService;
    }

    @Override
    public Future<Output> process(Input input) {
        return executorService.submit(() -> faultyService.process(input));
    }
}

Y entonces facilmente podriamos mandar a ejecutar el servicio y después obtener el resultado usando el Future que nos regresa; aplicando el timeout usando el metodo Future.get(long, TimeUnit) y si llega a pasar esto, deberíamos interrumpir el hilo que ejecuta esa llamada con Future.cancel(boolean) pasando como parametro true. E.g.

// given

AsyncFaultyService<FTPFile, File> asyncFtpDownloadService = new AsyncFaultyServiceAdapter<>(
                faultyFtpDownloadService, Executors.newCachedThreadPool());

// when
Future<File> localFileFuture = asyncFtpDownloadService.process(remoteFile);

// then
try {
    File localFileToProcess = localFileFuture.get(5, MINUTES);
} catch (ExecutionException | TimeoutException e) {
    localFileFuture.cancel(true);
    // Recuperar: reintentando o con fallback, quiza?
} catch (InterruptedException e) {
    // loggear y quizas hacer limpieza
}

Pero si nos damos cuenta, no hay mucha diferencia entre hacer esto y ejecutar faultyFtpDownloadService.process(remoteFile) directamente; excepto por el aplicar el timeout. Esto es porque aunque ese bloque de código se ejecuta en otro hilo, tenemos que bloquear el hilo actual (u otro) en Future.get(long, TimeUnit).

Un servicio que a veces falla, asíncrono y con callbacks que no bloquean

Para no bloquear el hilo que hace las llamadas, Guava nos ofrece un decorador para nuestros ExecutorService: ListeningExecutorService.

Al usar un ListeningExecutorService, en lugar de obtener instancias de Future, obtenemos instancias de ListenableFuture; el cual nos permite ejecutar codigo en los siguientes escenarios:

  • Cuando la computacion que se ejecuto termina satisfactoriamente; regresando un valor.
  • Cuando la computacion termina excepcionalmente i.e. lanzo una instancia de Throwable.

Ahora, si inspeccionamos la clase ListenableFuture, veremos el metodo addListener(Runnable, Executor). Pero ustedes se preguntaran ¿Como obtenemos el valor o la excepción si el listener es un Runnable? Pues usando una clase de utileria de Guava:

Futures.addCallback(listenableFuture,
    new FutureCallback<Object>() {
        @Override
        public void onSuccess(Output result) {
            // Esto se ejecuta si el calculo regresa satisfactoriamente
        }

        @Override
        public void onFailure(Throwable t) {
            // Esto se ejecuta si el calculo falla,
            // el argumento es la excepcion lanzada
        }
    },
    // El executor donde se ejecutara el callback,
    // si no se especifica, sera en el mismo hilo que ejecuto el calculo
    someExecutorService);

Entonces ya armados con esto, podemos hacer un decorador para el servicio que nos permita agregar listeners (porque puedes agregar mas de uno) que se ejecuten en un Executor al completar la llamada al servicio

public interface ListeningAsyncFaultyService<Input, Output>
        extends AsyncFaultyService<Input, Output> {

    ListenableFuture<Output> process(Input input);

}

public class ListeningAsyncFaultyServiceAdapter<Input, Output> implements
        ListeningAsyncFaultyService<Input, Output> {

    private final FaultyService<Input, Output> faultyService;
    private final ListeningExecutorService executorService;

    public ListeningAsyncFaultyServiceAdapter(
            FaultyService<Input, Output> faultyService,
            ExecutorService executorService) {

        this.faultyService = faultyService;
        this.executorService = MoreExecutors
                .listeningDecorator(executorService);
    }

    @Override
    public ListenableFuture<Output> process(Input input) {
        return executorService.submit(() -> faultyService.process(input));
    }

}

Y usarlo para nuestros fines

 // given
ListeningAsyncFaultyService<FTPFile, File> listenableService = new ListeningAsyncFaultyServiceAdapter(
                faultyFtpDownloadService, Executors.newCachedThreadPool());
       
// when
ListenableFuture<File> listenableFuture = listenableService.process(remoteFile);

// then
Futures.addCallback(listenableFuture,
    new FutureCallback<File>() {
        @Override
        public void onSuccess(File fileDownloaded) {
            process(fileDownloaded);
        }

        @Override
        public void onFailure(Throwable t) {
            // Fallback?
        }
    },
    someExecutorService);

Lo que falto de ver

Si este tema les parecio interesante, hay muchas mas que puedes hacer con los ListenableFutures. El wiki de Guava explica todo esto, entre lo que puedes encontrar esta como transformar futuros (i.e. composicion) aplicando una funcion al valor que regresan; agregar varios futuros del mismo tipo en uno solo que regresa una lista; obtener solo los resultados de aquellos futuros que terminaron correctamente y un largo etc. etc.

style="display:inline-block;width:728px;height:90px"
data-ad-client="ca-pub-5164839828746352"
data-ad-slot="7563230308">