style="display:inline-block;width:728px;height:90px"
data-ad-client="ca-pub-5164839828746352"
data-ad-slot="7563230308">

Comunicación asíncrona entre procesos Java

En este post de jpaul estuve comentando acerca de algunas de las broncas de RMI, y de cómo hay opciones más eficientes para cuando se necesita implementar comunicación eficiente entre dos aplicaciones Java, usando algo similar a RMI.

Primero que nada, quiero mostrar cuál es el problema concreto con RMI: cada llamada que se recibe en la aplicación que publica el componente, se hace en un hilo separado. Para demostrar esto, tomé el ejemplo original y lo modifiqué un poco: El servidor tarda un poco de tiempo, y también imprime el nombre del hilo actual y el número total de hilos activos:

Random rng = new Random(System.currentTimeMillis())

Remote stub = UnicastRemoteObject.exportObject(new TestRemote() {
  @Override
  public String sayHello(String name) throws RemoteException {
    println "Corriendo en ${Thread.currentThread().name} total ${Thread.activeCount()}"
    if(rng.nextBoolean())Thread.sleep(10)
    return "Hello, " + name;
  }
}, 0);
def registry = LocateRegistry.createRegistry(Registry.REGISTRY_PORT);
registry.bind("Test", stub);
println "Escuchando..."

El cliente, por su parte, invoca 100 veces el método, primero de manera secuencial, para mostrar cómo la llamada es síncrona, y luego creando un hilo separado para cada llamada, lo cual causa que a pesar de que cada invocación al componente remoto sea síncrona, se hagan varias llamadas simultáneas:

def registry = LocateRegistry.getRegistry();
TestRemote testRemote = (TestRemote) registry.lookup("Test");
//Llamadas consecutivas
100.times {
  println testRemote.sayHello("JavaMexico seq $it")
}
//Llamadas concurrentes
100.times {
  new Thread({ ->
    println testRemote.sayHello("JavaMexico conc $it")
  } as Runnable).start()
}

La salida del programa Server nos muestra dos cosas: primero, que cada llamada que se hace secuencialmente desde el server, es síncrona, de modo que hasta que una termina, se realiza la otra. Por lo tanto, todas se pueden despachar con el mismo hilo del lado del server. Vamos a ver 100 mensajes como este primero:

Corriendo en RMI TCP Connection(2)-192.168.0.104 total 2

Y después, cuando se hacen llamadas simultáneas, vamos a ver algo así:

Corriendo en RMI TCP Connection(10)-192.168.0.104 total 12
Corriendo en RMI TCP Connection(11)-192.168.0.104 total 13
Corriendo en RMI TCP Connection(6)-192.168.0.104 total 13
Corriendo en RMI TCP Connection(12)-192.168.0.104 total 15
Corriendo en RMI TCP Connection(14)-192.168.0.104 total 15
Corriendo en RMI TCP Connection(13)-192.168.0.104 total 15
Corriendo en RMI TCP Connection(9)-192.168.0.104 total 15
Corriendo en RMI TCP Connection(2)-192.168.0.104 total 16
Corriendo en RMI TCP Connection(15)-192.168.0.104 total 16
Corriendo en RMI TCP Connection(16)-192.168.0.104 total 17
Corriendo en RMI TCP Connection(17)-192.168.0.104 total 17

La cantidad de hilos que se crean en el proceso servidor dependen de varios factores, pero la cosa es que no tenemos control sobre cuántos se van a crear. Debe haber una manera más sencilla de procesar esto, sin que sea secuencial, que sí haya cierta concurrencia, pero a la vez teniendo algo de control sobre los recursos que se utilizan en el server.

La manera es: no usar RMI directamente, sino emularlo de alguna forma. En este caso voy a hacerlo sin usar ninguna biblioteca adicional, aunque bien podría facilitarme la vida con algo como Netty.

Lo primero es que ya no voy a usar la interfaz TestRemote en el server, sino que voy a implementar esto de otra forma, simplemente el código que despacha las peticiones será un Runnable. Y tengo que crear un ServerSocket que escuche para recibir conexiones, y tener un componente que maneje cada conexión. La comunicación será asíncrona, es decir, los mensajes no serán contestados necesariamente en el orden en que llegan.

Y para todo esto necesito una clase nueva, que represente el mensaje que voy a intercambiar con el server. Dado que en este caso solamente estamos enviando una cadena, sólo necesito esa misma cadena, pero con un identificador único para cada mensaje. Aunque en este caso particular se puede usar la misma clase tanto para enviar la petición al server como para recibir la respuesta, voy a definir dos diferentes.

class Peticion implements Serializable {
  long id
  String msg
  String toString() { "Peticion[$id $msg]" }
}
class Respuesta implements Serializable {
  long id
  String msg
  Date fecha
  String toString() { "Resp[$id $msg]" }
}

Dado que con RMI no estamos manejando autenticación de ningún tipo, por ahora tampoco me voy a molestar con eso para este ejemplo. El server es muy simple. Lo voy a partir en tres componentes: El que recibe las conexiones, el que maneja cada conexión, y el que procesa las peticiones.

class Server implements Runnable {
  int port
  Procesador proc

  void run() {
    final ServerSocket ss = new ServerSocket(port)
    println "Servidor escuchando en puerto ${port}"
    while (true) {
      Socket s = ss.accept()
      println "Nueva conexion desde ${s.remoteSocketAddress}"
      new Thread(new Conexion(sock:s, proc:proc)).start()
    }
  }
}

Cada que llega una nueva conexión remota, se crea un nuevo objeto que se echa a andar en un hilo dedicado. Este es el esquema hilo-por-conexión y es eficiente en un ambiente donde sabemos que habrá pocos clientes (pocas conexiones remotas). Para poder manejarse de manera asíncrona, se necesitan a su vez dos hilos para cada conexión: uno para lectura y otro para escritura. El de lectura será el que ya se tiene, donde se ejecuta el objeto Conexion (que es un Runnable):

class Conexion implements Runnable {
  Socket sock
  Procesador proc
  private LinkedBlockingQueue<Respuesta> salida = new LinkedBlockingQueue()
  void run() {
    def outs = new ObjectOutputStream(new BufferedOutputStream(sock.outputStream))
    outs.flush()
    def ins = new ObjectInputStream(new BufferedInputStream(sock.inputStream))
    //Hilo para escritura
    new Thread({->
      while(true) {
        Respuesta r = salida.take()
        outs.writeObject(r)
        outs.flush()
      }
    } as Runnable).start()
    println "Inicia ciclo lectura"
    while (true) {
      Peticion req = ins.readObject()
      println "Llega $req tenemos ${Thread.activeCount()}"
      proc.procesa(req, this)
    }
  }

  void send(Respuesta resp) {
    salida.put(resp)
  }
}

Cada que llega una petición nueva, se pasa al procesador de peticiones. Ese, a su vez, deberá crear una Respuesta y enviarla de regreso al cliente, por medio del mismo objeto Conexion:

class Procesador {
  private final ExecutorService pool = Executors.newFixedThreadPool(4)
  private final Random rng = new Random(System.currentTimeMillis())

  void procesa(Peticion req, Conexion c) {
    pool.execute({->
      Respuesta resp = new Respuesta(id:req.id, msg:"Hola, ${req.msg}!", fecha:new Date())
      if (rng.nextBoolean())Thread.sleep(10)
      c.send(resp)
    } as Runnable)
  }
}

El retardo aleatorio lo pueden quitar y poner a su antojo para ver la diferencia en el cliente, cuando ejecuten estos ejemplos.

Finalmente, el programa para correr el servidor, una vez que se tienen todas las clases:

Procesador proc = new Procesador()
Server server = new Server(proc:proc, port:1234)
server.run()

Del lado del cliente, decidí encapsular en un componente la lógica que maneja la conexión (envío de peticiones, lectura de respuestas), y dejar aparte la generación de varias peticiones para enviarse de manera simultánea o secuencial. Y de hecho sólo voy a incluir el envío secuencial; les dejo de tarea el envío simultáneo de peticiones, pero si se fijan bien en el diseño del cliente, no hará mucha diferencia cómo las envíen:

class Client implements Runnable {
  String host
  int port
  Listener listener
  private final LinkedBlockingQueue<Peticion> salida = new LinkedBlockingQueue()
  private Socket sock

  void send(Peticion req) {
    salida.put(req)
  }

  void disconnect() {
    sock.close()
  }

  void run() {
    sock = new Socket(host, port)
    ObjectOutputStream outs = new ObjectOutputStream(new BufferedOutputStream(sock.outputStream))
    outs.flush()
    ObjectInputStream ins = new ObjectInputStream(new BufferedInputStream(sock.inputStream))
    //Hilo de escritura
    Thread writer = new Thread({->
      while (true) {
        Peticion req = salida.take()
        println "Enviando $req"
        outs.writeObject(req)
        outs.flush()
      }
    } as Runnable)
    writer.start()
    //Lectura
    while (true) {
      Respuesta resp = ins.readObject()
      listener.responseReceived(resp)
    }
  }
}

Como mencioné anteriormente, hay un hilo dedicado a la escritura de peticiones, similar al que se tiene en el servidor: una cola a la cual se pueden agregar elementos desde varios hilos, con un solo hilo leyendo de la misma para escribir al socket. Ese Listener es una simple interfaz para evitar acoplamiento con el objeto que reciba la notificación de respuestas que llegan:

interface Listener {
  void responseReceived(Respuesta resp)
}

class Ejemplo implements Listener {
  private ConcurrentHashMap<Long,Peticion> reqs = new ConcurrentHashMap()
  Client client

  void responseReceived(Respuesta resp) {
    Peticion req = reqs.remove(resp.id)
    if (req) {
      println "Me contestan $resp"
    } else {
      println "Llega una respuesta cuya petición desconozco: $resp"
    }
  }

  void send(Peticion req) {
    reqs.put(req.id, req)
    client.send(req)
  }
}

Lo que hace este último objeto es llevar la cuenta de las peticiones que se han enviado, para saber qué respuesta corresponde con qué petición. Esto siempre es necesario en los esquemas de comunicación asíncronos.

Y finalmente el programa cliente:

Client cliente = new Client(host:'127.0.0.1', port:1234)
new Thread(cliente).start()
cliente.listener=new Ejemplo(client:cliente)
final AtomicLong ids = new AtomicLong()
500.times {
  long l = ids.incrementAndGet()
  Peticion req = new Peticion(id:l, msg:"Probando $l")
  cliente.listener.send(req)
}

Cuando ejecutan ambos (primero deben arrancar el servidor, luego el cliente por separado), podrán ver algo así en el servidor:

Llega Peticion[340 Probando 340] tenemos 9
Llega Peticion[341 Probando 341] tenemos 9
Llega Peticion[342 Probando 342] tenemos 9
Llega Peticion[343 Probando 343] tenemos 9
Llega Peticion[344 Probando 344] tenemos 9
Llega Peticion[345 Probando 345] tenemos 9
Llega Peticion[346 Probando 346] tenemos 9
Llega Peticion[347 Probando 347] tenemos 9
Llega Peticion[348 Probando 348] tenemos 9

Verán que no pasa de 9 hilos la aplicación servidor. Esto es porque todas las peticiones que van llegando, se procesan en un ThreadPool de 4 hilos. Pero cada una se va contestando tan rápido como es posible. Por su parte, el cliente puede enviar cualquier cantidad de peticiones y estar recibiendo las respuestas mientras envía más peticiones; las primeras líneas de salida del programa cliente serán así:

Enviando Peticion[16 Probando 16]
Enviando Peticion[17 Probando 17]
Enviando Peticion[18 Probando 18]
Enviando Peticion[19 Probando 19]
Enviando Peticion[20 Probando 20]
Enviando Peticion[21 Probando 21]
Enviando Peticion[22 Probando 22]

Pero llega un momento en que verán mensajes de envío mezclados con mensajes de recepción de respuesta:

Enviando Peticion[314 Probando 314]
Me contestan Resp[41 Hola, Probando 41!]
Enviando Peticion[315 Probando 315]
Me contestan Resp[42 Hola, Probando 42!]
Enviando Peticion[316 Probando 316]
Me contestan Resp[43 Hola, Probando 43!]
Enviando Peticion[317 Probando 317]
Me contestan Resp[44 Hola, Probando 44!]
Enviando Peticion[318 Probando 318]
Me contestan Resp[45 Hola, Probando 45!]
Enviando Peticion[319 Probando 319]
Me contestan Resp[46 Hola, Probando 46!]
Enviando Peticion[320 Probando 320]
Me contestan Resp[47 Hola, Probando 47!]

Y finalmente verán ya solamente mensajes de respuesta:

Me contestan Resp[478 Hola, Probando 478!]
Me contestan Resp[479 Hola, Probando 479!]
Me contestan Resp[480 Hola, Probando 480!]
Me contestan Resp[481 Hola, Probando 481!]
Me contestan Resp[482 Hola, Probando 482!]
Me contestan Resp[483 Hola, Probando 483!]
Me contestan Resp[484 Hola, Probando 484!]
Me contestan Resp[485 Hola, Probando 485!]

Esto es por la manera en que enviamos los mensajes: fue una ráfaga de varias peticiones y luego solamente esperar las respuestas. En un sistema real, es más común que haya cierta separación de tiempo entre los envíos, y las respuestas parezcan llegar de manera síncrona en periodos de baja actividad, pero en periodos de actividad elevada se podrá ver algo como esto: varias peticiones enviadas una tras otra, y las respuestas van llegando intercaladas con las peticiones que se siguen enviando.

Lo interesante es que el cliente ahora puede también enviar varias peticiones y esperar su respuesta de manera asíncrona, en vez de que cada invocación sea síncrona, como sucede con RMI, y que si se desea manejar algún esquema asíncrono, haya que hacer la conversión internamente en el proceso cliente. Hay que tomar en cuenta que con este esquema, el programa cliente utiliza una sola conexión con el servidor, por la cual envía cualquier cantidad de peticiones y recibe todas sus respuestas. Esto utiliza menos recursos en el servidor, ya que establecer cada conexión RMI toma cierto tiempo y requiere más recursos tanto en el proceso como en el equipo donde corre el server (un file descriptor por cada conexión en Linux, por ejemplo, además del Socket en el proceso Java, y los proxies de RMI involucrados).

Evidentemente es mucho más fácil implementar RMI que hacer todo esto, pero si su sistema llega a un punto en el que el esquema de RMI ya no les da para más porque tienen demasiadas conexiones en el server, lo cual se puede traducir en varios hilos, memoria, que se les acaben las conexiones a base de datos, etc, entonces esta es una buena alternativa (aunque ya en un sistema real, no conviene tanto implementarla como lo hice yo aquí, sino utilizar una biblioteca como Netty para simplificar el manejo de las conexiones y de la comunicación por sockets).

Comentarios

Opciones de visualización de comentarios

Seleccione la forma que prefiera para mostrar los comentarios y haga clic en «Guardar las opciones» para activar los cambios.

Re: cantidad de hilos

Es posible especificar el número máximo de hilos que utilizará RMI a través de la propiedad sun.rmi.transport.tcp.maxConnectionThreads. El valor por defecto es Integer.MAX_VALUE. Técnicamente es un número infinito. Aquí hay una lista de las propiedades que pueden especificarse. Pese a que estas propiedades no son parte de la API de RMI, están disponibles en las implementaciones de Sun († R.I.P.) y Oracle y —naturalmente— en el OpenJDK (como lo evidencia el código). Este post, aunque es de la era cuando todavía caminaban los dinosaurios sobre la tierra, puede ser muy informativo.

Imagen de ezamudio

:(

Buen tip, jpaul; no sabía que se puede configurar eso.

Pero no sirve la liga al post prehistórico...

Imagen de ezamudio

Y qué hace?

Supongamos que configuras máximo 2 hilos. Qué pasa entonces? Se niegan más conexiones si hay dos hilos ocupados, o se encolan para procesarse sólo en dos hilos?

Se niegan

Cuando el pool está saturado, las nuevas conexiones son rechazadas, como se aprecia en el código de una implementación. El cliente tendría que atrapar la excepción e intentarlo nuevamente. He ahí una limitante de RMI por la cual es necesario buscar alternativas.

Imagen de Richardmx

Invitación

Hola buen día Ezamudio

Antes que nada una disculpa por utilizar este foro como medio de comunicación, sin embargo
He oído bastantes referencias buenas de tí
Me gustaría hacerte una cordial invitación a un evento que estamos organizando
aquí en el estado de Puebla denominado FLISOL 2015
Nuestro Tecnológico es sede de este evento y nos encantaría que nos acompañaras y nos dieras una charla técnica sobre Spring Framework y los beneficios que ofrece este framework de desarrollo

Sede: Instituto Tecnológico Superior de Tepeaca a escaso unos 40 minutos de la ciudad de Puebla.
Día: 25 de Abril 2015

Esperamos contar con tu apreciable presencia y bueno quedo en espera de tu confirmación para ponernos de acuerdo como llegar a este Tecnológico
y demás detalles.

posdata: El Instituto esta dispuesto a pagarle sus viáticos si es requerido.

Atte. Ing Paulo Paredes
Coordinador de Flisol 2015 en Puebla
Tel. 2232751449 ext.110

style="display:inline-block;width:728px;height:90px"
data-ad-client="ca-pub-5164839828746352"
data-ad-slot="7563230308">