Cómo leer registros de base de datos como un Stream de Java

He estado usando Spring desde la versión 1.0 y una de las clases que siempre me han resultado extremadamente útiles es JdbcTemplate. Esperaba que para la versión 5 integrara funcionalidad del API de stream de Java, pero no fue así.

Sin embargo a veces necesito realizar búsquedas en base de datos que devuelven miles o incluso millones de registros, y no puedo usar los métodos de JdbcTemplate que devuelven listas porque me quedo sin memoria. Los métodos que usan RowCallbackHandler son más apropiados, pero sería mucho más conveniente poder usar Streams de Java, particularmente si se pueden leer los resultados como objetos usando algún RowMapper.

Así que decidí hacer mi propio generados de Stream para usar con un JdbcTemplate. Al final, terminé con uno que es realmente más genérico y se puede usar con cualquier código que genere elementos ya sea de manera finita o infinita (aunque para streams infinitos existe una API mucho más simple). No es suficiente material como para generar una biblioteca, así que decidí publicarlo como un post.

El reto

Primero que nada, hay que considerar que los streams son lazy, es decir la evaluación se difiere hasta el momento en que realmente se tiene que hacer y aún así se van obteniendo elementos de una fuente conforme se necesiten. Al crear un stream y definir operaciones sobre el mismo, no ocurre actualmente nada, hasta que se realice alguna operación que requiera realmente recorrer los elementos del stream y aplicar las operaciones definidas. Hay operaciones que se aplican a un stream completo (como contar los elementos, o juntarlos en otra colección), y hay operaciones de corto circuito (como determinar si al menos un elemento del stream pasa un filtro).

Así que queremos crear un stream que va a obtener sus elementos de una consulta a base de datos, por lo que se necesita diferir dicha consulta hasta el momento en que realmente se necesita, ya que requiere mantener una conexión abierta.

La única manera en que pude hacer que esto funcionara fue usando dos hilos: un productor en el cual se ejecuta la consulta y los resultados alimentan al stream, y un consumidor donde se leen los elementos del stream.

Necesitamos además un buffer donde el productor pueda poner elementos para que el consumidor los vaya tomando. Una LinkedBlockingQueue es perfecta para esto.

Entonces, el código queda así:

    public static <T> Stream<T> streamForQuery(int bufferSize, T endOfStreamMarker,
                                               Consumer<Consumer<T>> query) {
        final LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>(bufferSize);
        //Este es el consumidor que se le pasa al query;
        //recibe cada elemento que el query le pasa y lo pone en la cola.
        Consumer<T> filler = t -> {
            try {
                //Espera hasta 1 segundo intentando agregar a la cola
                //Si después de 1 segundo la cola sigue llena, algo grave pasó,
                //o muy probablemente hubo una operación de corto circuito en el stream.
                if (!queue.offer(t, 1, TimeUnit.SECONDS)) {
                    //Esta excepción se arroja en el hilo productor para detenerlo.
                    log.error("Timeout waiting to feed elements to stream");
                    throw new BufferOverflowException();
                }
            } catch (InterruptedException ex) {
                System.err.println("Interrupted trying to add item to stream");
                ex.printStackTrace();
            }
        };
        //Usamos un Spliterator para el stream que devolvemos.
        return StreamSupport.stream(() -> new Spliterators.AbstractSpliterator<T>(Long.MAX_VALUE, Spliterator.ORDERED) {
            //Esto es para saber si el productor ya arrancó
            private boolean started = false;
            //Guardar aquí una excepción si es que ocurre
            private volatile Throwable boom;
            /** Esto se llama una vez, antes de avanzar al primer elemento.
             * Esto arranca el hilo productor, que corre el query, y le pasamos el filler
             * Definido arriba. */

            private void startProducer() {
                //Obtener el hilo consumidor (donde corre esto inicialmente)
                Thread interruptMe = Thread.currentThread();
                //Aquí arrancamos el hilo productor
                new Thread(() -> {
                    try {
                        //Corre el query con nuestro consumidor especial
                        query.accept(filler);
                    } catch (BufferOverflowException ignore) {
                        //El filler arrojó esto, o sea que la cola no está siendo consumida
                    } catch (Throwable thr) {
                        //Guardar la excepción para que el hilo lector haga algo con ella
                        boom = thr;
                        interruptMe.interrupt();
                    }
                }).start();
                started = true;
            }
            @Override
            public boolean tryAdvance(Consumer<? super T> action) {
                if (!started) {
                    startProducer();
                }
                try {
                    //Tomar un elemento de la cola y devolverlo, si no es fin de stream
                    //to the action consumer.
                    T t = queue.take();
                    if (t != endOfStreamMarker) {
                        action.accept(t);
                        return true;
                    }
                } catch (InterruptedException ex) {
                    if (boom == null) {
                        System.err.println("Interrupted reading from stream");
                        ex.printStackTrace();
                    } else {
                        //Arrojamos la excepción del productor en el hilo consumidor
                        throw new RuntimeException(boom);
                    }
                }
                return false;
            }
        }, Spliterator.IMMUTABLE, false);
    }

Y así es como ese código se usa (aquí seguimos el ejemplo con JdbcTemplate):

final MyRow marker = new MyRow();
Stream<MyRow> stream = ModelStream.streamForQuery(100, marker, callback -> {
    //Pasamos un RowCallbackHandler que a su vez pasa MyRows al callback
    jdbcTemplate.query("SELECT * FROM really_big_table_with_millions_of_rows",
                       rs -> { callback.accept(myRowMapper.mapRow(rs, 0)); }
    );
    //Pasamos el marker al callback para indicar el fin de stream
    callback.accept(marker);
});

Cuando ese código se ejecuta, el query no se ha realizado. Incluso se pueden hacer cosas como esto:

stream = stream.filter(row -> row.isPretty());

Y hasta ahí sigue sin pasar nada. Pero cuando se hace algo como esto:

Optional<MyRow> row = stream.skip(100_000).limit(1000).findAny();

Entonces ya se ejecuta el query, se leen ( y saltan ) los primeros cien mil registros, y luego se pasa cada registro por el filtro, hasta que uno pase el filtro o se hayan leído mil registros.

##Un gran poder conlleva gran responsabilidad

Es muy tentador usar esto para simplemente hacer SELECT * FROM loquesea y filtrar en memoria. Por favor, por lo que más quieran, no lo hagan. Filtrar en memoria jamás será un reemplazo adecuado para un buen WHERE con índices bien diseñados en sus tablas.

Yo he utilizado esto principalmente para reportes, donde puedo concatenar streams de tipos disjuntos mapeando sus elementos a un tipo común para después continuar su procesamiento. Básicamente, estoy compensando por la falta de tipos unión en Java.

Ahora, habiendo dicho eso, está bastante chingón poder leer registros de una base de datos en modo de streaming. Tal vez un día veamos algo así ya integrado en Spring JdbcTemplate, o en jOOQ o incluso a nivel de JDBC...

Comentarios

Opciones de visualización de comentarios

Seleccione la forma que prefiera para mostrar los comentarios y haga clic en «Guardar las opciones» para activar los cambios.
Imagen de Nopalin

Suena bastante bien la idea,

Suena bastante bien la idea, pero no me queda claro cual es la ganancia de éste mecanismo (mas que sugar sintax desde donde lo veo). Los stream trabajan con las listas ya en memoria y solo le vas indicando acciones que se ejecutan hasta cierto periodo (lo que elimina duplicidad de iteraciones y otras cosas más). Como bien dices nada reempalza un buen where con indices, y yo tambien uso stream en los reportes pero mas que nada para ordenar, filtrar, crear mapas, etc.

Saludos

Imagen de ezamudio

Reportes, conciliaciones

Un caso donde esto es útil es generación de reportes, o archivos de conciliación, donde tienes que procesar millones de registros. No puedes hacer un simple query que te devuelva una lista, y es muy engorroso manejar JDBC a bajo nivel. Puedes usar un RowCallbackHandler directamente, es otra opción; este simplemente me parece que permite un uso un poco más idiomático.