ThreadPoolTaskExecutor

me recomendaron usar ThreadPoolTaskExecutor para manejar mi pool en mi socket java (me recomendaron que usara spring y lo inyectara)
ahora tengo una duda acabo de configurar bonecp con spring y el es mi pool de conexiones a oracle y si tengo un pool de hilos para mi socket java debo tener una relacion entre estos dos pool??? por ejemplo si mi socket java llegan 2500 conexiones simltaneas y configure un socket
50 hilos para atender esas 2500

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans <a href="http://www.springframework.org/schema/beans/spring-beans-2.0.xsd" title="http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">http://www.springframework.org/schema/beans/spring-beans-2.0.xsd</a> <a href="http://www.springframework.org/schema/util" title="http://www.springframework.org/schema/util">http://www.springframework.org/schema/util</a> <a href="http://www.springframework.org/schema/util/spring-util-2.0.xsd"" title="http://www.springframework.org/schema/util/spring-util-2.0.xsd"">http://www.springframework.org/schema/util/spring-util-2.0.xsd"</a> >
<bean id="TaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="50" />
<property name="maxPoolSize" value="60" />
<property name="queueCapacity" value="75" />
</bean>
</beans>

mi pool con bone cp debe tener el mismo numero de hilos??? o mayor???? o como lo debo cofigurar

Comentarios

Opciones de visualización de comentarios

Seleccione la forma que prefiera para mostrar los comentarios y haga clic en «Guardar las opciones» para activar los cambios.
Imagen de ezamudio

mmmm

Pues son dos cosas completamente distintas, pero si los trabajos que van a realizar los Runnables que metes al thread pool van a trabajar primariamente con base de datos, entonces sí es recomendable que coincida el número de hilos de tu thread pool con el número de conexiones del connection pool.

Lo que veo en tu config es que le estás poniendo capacidad máxima de 75 a la cola, eso significa que solamente podrás recibir hasta 135 conexiones a la vez (60 procesándose y 75 encoladas), si intentas encolar más te va a arrojar una RejectedExecutionException. Revisa los docs, creo que puedes ponerle 0 para que no tenga fin, o ponle una capacidad mayor al menos. Y si le pones cualquier capacidad que no sea ilimitada, te recomiendo encolar los runnables al pool dentro del try-catch para que en el bloque de catch RejectedExecutionException puedas al menos devolver un error a ese socket de que el server está saturado, y cerrar esa conexión.

Por último, una recomendación primariamente estilística: agrega el namespace p a ese XML para poder definir tus beans de manera más breve:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xmlns:context="http://www.springframework.org/schema/context"
 xmlns:p="http://www.springframework.org/schema/p"
 xsi:schemaLocation="http://www.springframework.org/schema/beans <a href="http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
" title="http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
">http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
</a>    <a href="http://www.springframework.org/schema/context" title="http://www.springframework.org/schema/context">http://www.springframework.org/schema/context</a> <a href="http://www.springframework.org/schema/context/spring-context-3.2.xsd">

<bean" title="http://www.springframework.org/schema/context/spring-context-3.2.xsd">

<bean">http://www.springframework.org/schema/context/spring-context-3.2.xsd">

...</a> id="threadpool" class="bla"
    p:corePoolSize="50"
    p:maxPoolSize="60"
    p:queueCapacity="2000" />
</beans>

vale ezamudio

voy a tener en cuenta tu opcion estilística.

los otro que me dices es que si voy a trabajar primariamente con la base de datos, la verdad si ya que el socket java recibe la trama y de ahí la paso aun procedimiento almacenado que se encarga de des-copilar la trama y hace el llamado al paquete correspondiente (pl-sql logic) al final devuelve esa trama de respuesta que envio de vuelta a los clientes (HOST).

te hable de tener 2500 peticiones simultaneas y en el cofig que me das veo p:queueCapacity="2000" p:maxPoolSize="60" la suma me da 2600
si en futuro tengo un crecimiento del doble 5000 peticiones simultanea los valores anteriores podria ser p:queueCapacity="4040" p:maxPoolSize="60" o seria mejor opción que nuestro p:maxPoolSize="60" inciara mayor????

con referente a dejar esto del pool TaskExecutor con los valores que tu pusiste

    p:corePoolSize="50"
    p:maxPoolSize="60"
    p:queueCapacity="2000" />

mi configuracion para bonecp para esas 2500 conexiones como deria quedar??
actualmente esta asi:

<bean id="dataSource" class="com.jolbox.bonecp.BoneCPDataSource"  destroy-method="close">
   
   <property name="driverClass" value="oracle.jdbc.OracleDriver" />
   <property name="jdbcUrl" value="jdbc:oracle:thin:@192.168.18.9:1521:siap" />
   <property name="username" value="siap"/>
   <property name="password" value="ac75siap"/>
   <property name="IdleConnectionTestPeriodInMinutes" value="60"/>
   <property name="IdleMaxAgeInMinutes" value="240"/>
   <property name="maxConnectionsPerPartition" value="30"/>
   <property name="minConnectionsPerPartition" value="10"/>
   <property name="partitionCount" value="3"/>
   <property name="acquireIncrement" value="5"/>
   <property name="statementsCacheSize" value="100"/>
   
    </bean>  
   

haciendo test con un solo un usuario

1. haciendo el test ya con bonecp y ejecutando un paquete oracle no se porque me demora 2sg en devolver respueta???
2. porque cuando uso srping para cofigurar bocecp me trae simpre en la consola.

dic 06, 2013 5:50:42 PM org.springframework.context.support.AbstractApplicationContext prepareRefresh
Información: Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@18ca663: startup date [Fri Dec 06 17:50:42 COT 2013]; root of context hierarchy
dic 06, 2013 5:50:42 PM org.springframework.beans.factory.xml.XmlBeanDefinitionReader loadBeanDefinitions
Información: Loading XML bean definitions from class path resource [resources/applicationContext.xml]
dic 06, 2013 5:50:42 PM org.springframework.beans.factory.support.DefaultListableBeanFactory preInstantiateSingletons
Información: Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@1a44220: defining beans [dataSource,userDao,userService]; root of factory hierarchy

3.no se si esto lo hago bien

  public void run(){
            String inputLine,outputLine;
            TramaService ts = new XXXX_TramaService(threadID, threadActive,puerto);
           /*En el constructor de TramaService tengo la inyeccion  a los daos

                   ApplicationContext context = new ClassPathXmlApplicationContext("resources/applicationContext.xml");
                   userService = (UserService) context.getBean("userService");

           */


           
            try {
                   
                    while ((inputLine= entrada.readLine()) != null) {
                        outputLine=ts.processInput(inputLine);
                        salida.println(outputLine);
                       }

catch(.....)
{......}
}

Imagen de ezamudio

no

yo no hice cuentas por ti. Pero 2000+60 es 2060, no 2600. Y no me voy a poner a darle fine tuning a tu aplicación que ni siquiera sé realmente cómo funciona (ya sé que lo explicaste a grandes rasgos pero para fine tuning se necesita conocer los detalles de implementación y hacer un buen análisis de los cuellos de botella para saber qué valores tener en todos los parámetros).

Si haces una prueba con un solo usuario, en frío, no puedes esperar buenos tiempos. Cuánto tiempo de esos 2s se fueron en el JIT? en cargar las clases que no estaban cargadas? Primero debes hacer una prueba, ignorar los tiempos, esperar unos segundos, y hacer una segunda prueba para ver un tiempo más realista.

No entiendo bien lo del punto 3 pero si ese run es el de los runnables que metes al pool, estás creando un application context PARA CADA TAREA que se ejecuta, eso es el peor overhead que puedes tener. Sólo necesitas crear un application context cuando inicia el servicio y todos los runnables usarán los beans que están en ese application context (si necesitas inyectar varias cosas a los runnables, considera definir un bean con scope prototype en el application context para obtener nuevos runnables de ahí en vez de crearlos por código).

Y no entiendo eso de que descompilas una trama pero no suena nada bien. Si es XML pues mejor usa SOAP y haz un web service normalito en vez de meterte en broncas de manejar el pool de sockets y todo eso. Si es un protocolo propietario pues sí, tienes que parsear lo que te mandan, etc.

Incluso considera que podrías necesitar más de un pool de hilos, dependiendo de lo que haga tu aplicación. No entiendo por qué tu parser es un stored procedure (si es que entendí bien) pero pues eso es tu bronca. Pero podrías tener varios pools:

- Uno donde encolas las conexiones entrantes, sólo para parsear las tramas
- Otro donde encolas las tramas ya parseadas, para procesarlas
- Otro donde encolas las tramas procesadas, para generar y enviar las respuestas

Por qué tres en vez de uno solo? Porque parsear la trama, procesarla y generar la respuesta son tres cosas muy distintas y cada una necesita recursos distintos y tarda un tiempo muy distinto. El primer pool podría tener muy pocos hilos porque las tareas se despachan muy rápido; el segundo tener más hilos y es el que debe estar sincronizado con las conexiones a base de datos, y el tercero también ser muy chico porque generar la respuesta es muy rápido.

respuesta del punto 3

    Sólo necesitas crear un application context cuando inicia el servicio y todos los runnables usarán los beans que están en ese application context (si necesitas inyectar varias cosas a los runnables, considera definir un bean con scope prototype en el application context para obtener nuevos runnables de ahí en vez de crearlos por código).

Lo que te entendi fue hacaer esto:

   <bean id="taskExecutor"
                class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
                <property name="corePoolSize" value="50" />
                <property name="maxPoolSize" value="2000" />
                <property name="queueCapacitye" value="10" />
        <!--    <property name="WaitForTasksToCompleteOnShutdown" value="true" />-->
        </bean>

en el socket java en la funcion principal:

        ApplicationContext context = new ClassPathXmlApplicationContext("Spring-Config.xml");
       ThreadPoolTaskExecutor taskExecutor = (ThreadPoolTaskExecutor) context.getBean("taskExecutor");

lo que no entiendo es:
si necesitas inyectar varias cosas a los runnables, considera definir un bean con scope prototype en el application context para obtener nuevos runnables de ahí en vez de crearlos por código

haciendo algunas pruebas...

segun lo que comentas del context debe ser cargado una única vez
asi que lo maneje de la siguinete manera publico la clase completa para que veas como manejo una clase anidada que es la que extiende del runnable.

 public ServidorService(String puerto){
            ApplicationContext context = new ClassPathXmlApplicationContext("resources/applicationContext.xml");
            serviceBusinnes = (ManagedBean) context.getBean("service");
            ThreadPoolTaskExecutor taskExecutor = (ThreadPoolTaskExecutor) context.getBean("TaskExecutor");
try {
             this.puerto=puerto;
             readSocketTimeout=readSocketTimeout*Integer.parseInt("7");
             serverSocket = new ServerSocket(5555);
            System.out.println("server Jasak Running!!");
            while (true) {
                socket = serverSocket.accept();
                socket.setSoTimeout(readSocketTimeout);
                System.out.println(String.format("ThreadId (van %d)", threadID.incrementAndGet()));
                System.out.println(String.format("ThreadActive (van %d)", threadActive.incrementAndGet()));
                WriteLog.save("[PORT=" + puerto + "][THREAD_ID=" + threadID + "] : STARTED " + threadActive + " Hilos Activos.");
                 taskExecutor.execute(new ListenerSocket(socket));
            }
        } catch (IOException ex) {
            WriteLog.save("[PORT=" + puerto + "][THREAD_ID=" + threadID  + "] : " + ex.toString());
        }
        finally{
            System.gc();
        }
    }
    private class ListenerSocket implements Runnable {
        private BufferedReader entrada;
        private PrintWriter salida;
        public ListenerSocket(Socket socket)
        {
            try {
                this.entrada = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                this.salida  = new PrintWriter(socket.getOutputStream(),true);
               
                } catch (IOException ex) {
                Logger.getLogger(ServidorService.class.getName()).log(Level.SEVERE, null, ex);
                }
        }
        @Override
        public void run(){
            String inputLine,outputLine;
           
           // serviceBusinnes = new ManagedBeanImpl();
           // esta clase que creo aquí es la que tengo duda, no se si esta debería  se un singlenton o mejor dicho inyectarla al context
           // pero como le mandaria los parametros que le estoy pasando??  
            TramaService ts = new JASAK_TramaService(threadID, threadActive,puerto,serviceBusinnes);
           
            try {
                while ((inputLine= entrada.readLine()) != null)
                {
                    outputLine=ts.processInput(inputLine);
                    salida.println(outputLine);
                     
                    }
                 }
            catch (SocketTimeoutException ex){
                   try {
                          System.out.println("adios cliente");
                          WriteLog.save("[PORT=" + puerto+ "][THREAD_ID=" + threadID  + "] : " + ex.toString());
                             
                          if(salida!=null)  salida.close();
                          if(entrada!=null) entrada.close();
                          if(socket!=null)  socket.close();
                       }  catch (IOException ex1) {}
            }
            catch (SocketException se) {
                     try {
                               //System.out.println("cliente se desconecta");
                               //WriteLog.save("[PORT=" + socket.getPort()+ "][THREAD_ID=" + threadID  + "] : " + se.toString());
                               if(salida!=null)  salida.close();
                               if(entrada!=null) entrada.close();
                               if(socket!=null)  socket.close();
                          }    catch (IOException ex1) {}
              }
            catch (IOException ex)
               {
                WriteLog.save("[PORT=" + socket.getPort()+ "][THREAD_ID=" + threadID  + "] : " + ex.toString());
                try {
                    if(salida !=null) salida.close();
                    if(entrada !=null) entrada.close();
                    if(socket !=null) socket.close();
                } catch (IOException ex1) {
                    Logger.getLogger(ServidorService.class.getName()).log(Level.SEVERE, null, ex1);
                }
               }
                finally
               {
                   try {
                     if(salida  !=null) salida.close();
                     if(entrada !=null) entrada.close();
                     if(socket  !=null) socket.close();
                     threadActive.getAndDecrement();
                     //System.gc();
                   } catch (IOException ex) {
                     System.out.println("problemas al cerrar socket.");
                }}}}}

la clase anterior hago uso de la siguiente clase que me maneja las opciones
segun la trama que me llega en este ejemplo si el cliente manda "PING" server responde "PONG"

    public class JASAK_TramaService implements TramaService {
    AtomicInteger threadID;
    AtomicInteger threadActive;
    String puerto;
    private ManagedBean serviceBusinnes;
    public JASAK_TramaService(AtomicInteger threadID,AtomicInteger threadActive,String puerto,ManagedBean serviceBusinnes)
    {
        this.threadID=threadID;
        this.threadActive=threadActive;
        this.puerto=puerto;
        this.serviceBusinnes=serviceBusinnes;
       
    }
    @Override
    public String processInput(String inputLine)
    {
        String outputLine = null;
        Date date= new Date();
        long dateTime=date.getTime();
        WriteLog.save(" JASAK RECIBIENDO : " + inputLine + " fecha/hora: " + date.toString());
        /*logic here BENGIN*/
        if("PING".equals(inputLine)){
              outputLine="PONG";
           }
        else if
        {
          //futuras tramas aparte de PING  aqui seria algo asi
             outputLine=funcionService(); //esta funcion service esta conectada a un dao y dicho dao ejecuta un procedimiento almacenado que me devuelve una trama de respuesta
 
        }
        /*logic here END*/
        Date secondDate= new Date();
        long timeDelay=(int)((secondDate.getTime()-dateTime)/1000);
        WriteLog.save(" JASAK ENVIANDO   : " + outputLine + " fecha/hora: " + secondDate.toString() + " EN " + timeDelay + " seg");
        return outputLine ;
    }

algo que me gustaria saber si el manejo de la clase JASAK_TramaService es el correcto si al manejarlo asi no voy a tener problemas
de tramas o sea que va ser (THREAD SAFE).????

si manejo una clase de utilidades donde todos sus métodos son estáticos debo trabajar cada método como sincronizado o no es necesario???

Imagen de ezamudio

sorry

no voy a leer todo ese código.

Una manera muy sencilla de usar el application context de Spring es que ahí "alambras" toda tu aplicación; defines todos tus componentes y los conectas entre sí, y luego solamente necesitas desde un main cargar ese application context, y en ocasiones obtener un bean del contexto para invocar un método y echar a andar toda la cosa.

Por tanto en tu código sólo necesitas tener algún Runnable que es el que inicia toda la aplicación. Dado que tienes una aplicación que escucha en un puerto TCP para recibir conexiones, ahí podría ser el punto de partida. Digamos que tienes algo así:

class Servidor implements Runnable, ApplicationContextAware {
  int puerto
  ThreadPoolExecutor tpool
  ApplicationContext ctxt

  void run() {
    ServerSocket server = new ServerSocket(puerto)
    while (true) {
      Socket conn = server.accept();
      Conexion task = ctxt.getBean("worker");
      task.socket = socket
      tpool.execute(task)
    }
  }
}

//Por otra parte tienes tu Conexion
class Conexion implements Runnable {
  @Resource DataSource dataSource
  @Resource OtraCosa otroBean
  Socket socket

  void run() { ... }
}

Y luego tienes un application context donde defines cosas más o menos así:

<bean id="threadPool" class="blabla esto ya lo tienes" />
<bean id="main" class="Servidor"
    p:puerto="1234" p:tpool="threadPool" />
<bean id="dataSource" class="tu datasource de bonecp" />
<bean id="worker" class="Conexion" scope="prototype"
    p:otroBean="otro bean que tengas por ahi" />

Entonces en la misma clase Servidor o en alguna otra puedes tener tu main:

static void main(String[] args) {
  ApplicationContext ctxt = //cargar de xml o lo que sea
  ctxt.getBean("main").run()
}

y corres la aplicación invocando ese main y ya. Lo que ocurrirá es que se carga el application context, se crean los componentes necesarios (datasource, threadpool, server, etc EXCEPTO ese bean de worker porque tiene scope prototype) y luego obtienes el bean main y lo ejecutas (invocas su método run) con lo que empieza a escuchar en el puerto configurado.

Cuando reciba una conexión, va a crear una instancia de Conexion, pero no usando new sino a través del application context de Spring; el bean worker está definido como prototipo por lo que cada vez que le pidas uno al application context, va a crear uno nuevo y le va a inyectar lo que necesita y te lo devuelve ya configurado. Nada más necesitas pasarle el socket y ya, está listo para realizar su trabajo.

runnable en clase servidor

con que fin en tu clase servidor hacer el runnable???
en mi clase servidor la manejo asi:
como se que no le gusta leer tanto codigo dejo lo relevante

public class ServidorService {
   
    public ServidorService(String puerto){
           
            ApplicationContext context = new ClassPathXmlApplicationContext("resources/applicationContext.xml");
            serviceBusinnes = (ManagedBean) context.getBean("service");
            ThreadPoolTaskExecutor taskExecutor = (ThreadPoolTaskExecutor) context.getBean("TaskExecutor");
     
            while (true) {
                socket = serverSocket.accept();
                socket.setSoTimeout(readSocketTimeout);
                System.out.println(String.format("ThreadId (van %d)", threadID.incrementAndGet()));
                System.out.println(String.format("ThreadActive (van %d)", threadActive.incrementAndGet()));
                WriteLog.save("[PORT=" + puerto + "][THREAD_ID=" + threadID + "] : STARTED " + threadActive + " Hilos Activos.");
               
                // AQUI MI CLASE LISTENERSOCKET ES LA QUE IMPLEMENTA EL RUNNABLE (ESTA CLASE ES UNA INTERTA DE MI SERVIDOR)
                taskExecutor.execute(new ListenerSocket(socket));
            }
       
    }

   private class ListenerSocket implements Runnable {

       public void run(){

while ((inputLine= entrada.readLine()) != null)
                {
outputLine=ts.processInput(inputLine);     //mi trama que llega la mando a processinput la procesa y me devuelve el resultado
                       
}
   }

con referente Conexion

esta clase la veo como analoga a la que yo manejo como ListenerSocket de esta clase no estido porque
haces aqui @Resource DataSource dataSource??? el del bean @Resource OtraCosa otroBean si no le veo problema , pero ese datasource no deberia ir en los daos???

class Conexion implements Runnable {
  @Resource DataSource dataSource
  @Resource OtraCosa otroBean
  Socket socket
  void run() { ... }
}
Imagen de ezamudio

como sea

es un ejemplo de un bean con scope prototype que se le inyectan componentes a cada instancia que se crea. Es un ejemplo, no estoy diseñando tu aplicación. Imagínate que inyecté ahí un DAO que a su vez tiene el DataSource.