lunes, 12 de diciembre de 2016

Posreflexión del sistema de limpieza propuesto

He aquí algunas de las preguntas que bien podrían plantearse en la etapa
de diseño de un sistema concurrente con cierta orientación a la Erlang, pero
que bien puede abstraerse lo suficiente como para pensarlas desligándose
de cualquier implementación, en principio.

  1. La primera pregunta es qué procesos estarán manteniendo en sí al sistema, en cuanto activos participantes del mismo bajo (presumiblemente) todas las circunstancias de desarrollo del mismo. Una identificación adecuada de los actores principales y de sus actividades o tareas asociadas hará que el sistema se encamine hacia el objetivo que pretende satisfacer (a propósito utilicé una terminología cercana a UML, pero sin demasiado apego: la idea es que un formalismo que permita el tratamiento concurrente de varios actores será de gran utilidad; en Erlang tales posibles actores serían "procesos").
  2. La siguiente parte bien puede llamarse una "coreografía": es un término común en el manejo del flujo de tareas (workflow), y en nuestro caso se traduce en determinar en qué momento los procesos entran en juego o dejan de tener preponderancia; es también un buen momento de pensar en la concurrencia entre procesos, para de esta manera armonizar en la medida de lo posible qué procesos harán una buena intervención para el logro del objetivo global. A diferencia del caso secuencial, no obstante, aquí la gran dificultad es que es complicado preveer todas las posibles formas en que el sistema podría desarrollarse, además de considerar (o no) las posibles trampas en que se podrían caer si se es demasiado optimista en el buen desarrollo del sistema sin mayores análisis.
  3. Lo siguiente sería el establecimiento de los "diálogos": a diferencia de lo que sería una puesta en escena (para seguir con la analogía teatral), aquí se espera que la comunicación entre los procesos no sea tan fija ni tan improvisada: no es tan fija en el sentido de que las mismas condiciones de desarrollo del sistema bien pueden necesitar "improvisaciones", que corresponde, aproximandamente, a una nula participación de los seres humanos y una confianza sana en que algunos obstáculos seran sorteados por las mismas capacidades de comunicación de los procesos; también, no puede ser tan improsivada ya que los diálogos deberían establecerse con los componentes de un conjunto bien definido de mensajes, cada mensaje manteniendo en su estructuración un protocolo, tema que sigue.
  4. Los protocolos de comunicación entre procesos deben seguirse en todo lo posible: reivindicando el punto 3, pensaríamos que existen realmente diálogos (y no monólogos) para que los procesos en comunicación tengan éxito en comunicarse, manteniendo la mismo tiempo una suerte de disciplina que se expresa en aquello que pueden comunicar y en aquello otro que pueden recibir como mensaje. Aún suponiendo que los mensajes se envíen y se reciban exitosamente (por al menos dos procesos) es necesario que haya filtros (guardias, guards, en inglés) para que solo aquellos mensajes filtrados sean procesados; en ocasiones los guardias son explícitos (con, por ejemplo, la condicional diferida when, como en el caso de probar que un número sea mayor que 0, o bien pueden ser implícitos, cuando el mecanismo de "pattern matching" (presente en algunos lenguajes de programación funcionales) permite que solo aquellos mensajes que casen con un patrón sean procesados.
  5.  Aunque no es buena idea suponer que nuestra herramienta estará tratando todas las posibilidades sin más es, no obstante, una buena decisión de diseño pensar que algunos tópicos sí están resueltos (al menos en esta etapa); tal es el caso del manejo de los mailbox (buzones) que cada proceso en Erlang posee: los buzones siguen una política de agenda de tratamiento de mensajes de tipo cola; si tal esquema es suficiente está bien; si no, pues es posible programar otros esquemas (por ejemplo, de colas de prioridad) para satisfacer algunas necesidades específicas.
  6. La parte de sincronía o asincronía debe resolverse cuanto antes. Erlang toma un punto de vista natural asíncrono, pero en ocasiones una sincronía es necesaria. Notando el programa previo en este blog, se quiso realizar una asincronía entre los trabajadores limpiadores y el gestor, pero no fue prudente: los trabajadores que completen su tarea tienen que reportarlo al gestor, y éste haría mal (el sistema fallaría) si no hiciera caso del estado actual de sus trabajadores, tomando compromisos de clientes si ton ni son al no percatarse de qué pueden y no pueden hacer sus trabajadores.
  7. Es de notar que en épocas recientes el adjetivo de "inteligente" abunda: aún en una etapa de modelación, es necesario considerar qué tareas pueden realizarse con algunas medidas (métricas) de éxito, para de esa forma tomar aquella opción que o sea óptima o se acerque lo más posible a tal optimalidad. El gran obstáculo al momento para el caso de los sistemas concurrentes es que su desarrollo puede ser imprevisible y no estar preparado para tomar una buena decisión de optimalidad, pero aún así es obligatorio por parte del arquitecto o diseñador considerar que tópicos comunes en el campo de inteligencia artificial pueden ser válidos aquí: por ejemplo, la idea de dividir y vencer, o bien de otorgar una medida (heurística) a la manera en cómo se ejecuta una tarea concurrentemente.

viernes, 9 de diciembre de 2016

Una empresa de limpieza en Erlang

%Un pequeño programa con concurrencia real, pero a la vez simulando
%una empresa que presta un servicio de limpieza. Ejecutar los siguientes
%comandos para darse una idea de qué hace:
%> c(lt1).
%> lt1:start().
%> Ls=lt1:pisoBasico(), gestor ! Ls.

%Metodología:
% Diseño multiagente con elementos clasificados en gestores y trabajadores.
% Módulos para cada agente: para un gestor y un trabajador.
% Identificación de capacidades de cada agente.
% Creación o diseño de protocolos de comunicación...
% Identificación de restricciones físicas para la simulación.


-module(lt1).
-export([start/0,start1/1,start2/1,gestionar/2,pisoBasico/0,es_piso/1, limpiar/3,
                             tomar/2,omitir/2,
                             dividir/1,sleep/1,retardar/1,
                             caso/2]).

es_parOrdenado({A,B}) when
          is_integer(A) and
          is_integer(B) -> true.

es_piso([]) -> true;
es_piso([P|Ls]) -> es_parOrdenado(P), es_piso(Ls).

start() ->
    register(gestor,spawn(lt1,gestionar,[0,0])),
    start1(gestor),
    start2(gestor).
%recibeRapido(trabajador1), recibeRapido(trabajador2)
%{trabajador1:6,trabajador2:3}

%gestionar(0,0) %servidor???
start1(Gestor) -> register(trabajador1,spawn(lt1,limpiar,[Gestor,trabajo1,6])).
start2(Gestor) -> register(trabajador2,spawn(lt1,limpiar,[Gestor,trabajo2,3])).

caso(N,M) ->
      if
         N>M -> io:format("hola",[]);
         N==M -> io:format("hollaaa",[]);
         N<M -> io:format("adios",[])
      end.

sleep(T) ->
     receive
     after T -> true
     end.

%AreaAsignada es la capacidad que un robot puede limpiar en términos de área.
%Por agregar, Id del trabajador
retardar([]) -> true;
retardar([_|Ls]) ->
         sleep(5000),
         retardar(Ls).

limpiar(Gestor,A,AreaAsignada) ->
              receive
                  {segmento,Segmento,ProcesoSolicitante} ->
                                   es_piso(Segmento),
                           retardar(Segmento),
io:format("~n Segmento de piso correcto y terminado~n~p~n",
                                          [Segmento]),                       
                             Gestor ! {terminado,A,self()}, %Palabras o msg a trab.
                             limpiar(Gestor,A,AreaAsignada);
                        {verificar, AreaALimpiar,ProcesoSolicitante} when
                                            (AreaALimpiar =< AreaAsignada) ->
                             ProcesoSolicitante ! {si,self()},
                             io:format("si~n",[]),
                             limpiar(Gestor,A,AreaAsignada);
                  {verificar, AreaALimpiar,ProcesoSolicitante} when
                                (AreaALimpiar > AreaAsignada)
                             ->
                             ProcesoSolicitante ! {no,self()},
                             io:format("no~n",[]),
                             limpiar(Gestor,A,AreaAsignada);
                 {limpiar, AreaALimpiar,ProcesoSolicitante} when
                                            (AreaALimpiar =< AreaAsignada) ->
                             ProcesoSolicitante ! {siPuedoLimpiar,self()},
                             io:format("sí puedo~n",[]),
                             AreaAsignadaNueva = AreaAsignada-AreaALimpiar,
                             limpiar(Gestor,A,AreaAsignadaNueva);
                {limpiar, AreaALimpiar,ProcesoSolicitante} when
                                (AreaALimpiar > AreaAsignada)
                             ->
                             ProcesoSolicitante ! {noPuedoLimpiar,self()},
                             io:format("no puedo~n",[]),
                             limpiar(Gestor,A,AreaAsignada);
               {alto,ProcesoSolicitante} -> io:format("Hasta luego, fue un placer",[])
     end.
                            

pisoBasico() -> [{X,Y} || X <-lists:seq(1,3),Y <-lists:seq(1,3)].

%tomar(N,Ls)

tomar(0,_) -> [];
tomar(N,[A|Ls]) -> [A|tomar(N-1,Ls)].
omitir(0,Ls) -> Ls;
omitir(N,[_|Ls]) -> omitir(N-1,Ls).

dividir(Ls) ->
        L=length(Ls),
        Mitad=L div 2,
        {tomar(Mitad,Ls),omitir(Mitad,Ls)}.


%Es una función asociada únicamente al gestor
gestionar(K1,K2) ->
%        link(trabajador1),
        if
           (K1==0) and (K2==0) -> true;
           (K1==0) and (K2==1) -> true;
           (K1==1) and (K2==0) -> true;
           (K1==1) and (K2==1) -> self() ! "TERMINADO!!!!~n"
        end,
        receive
           {reporte,terminado,IdProceso} ->
                                io:format("Listo~n~p",[IdProceso]),
                         gestionar(K1,K2);
           {Piso,IdProceso} ->
                            es_piso(Piso),
                       io:format("Piso a limpiar~n~p~n",[Piso]),
                {A,B}=dividir(Piso),
%                falta...
%                verificar si lo puede hacer...
%                             si lo puede hacer se prodece con segmento...
                trabajador1 ! {segmento, A, self()},
                trabajador2 ! {segmento, B, self()},
%    osito1@RieGau ! {segmento, A, self()},
%    osito2@RieGau ! {segmento, B, self()},
           
                gestionar(K1,K2);
        {terminado,trabajo1,Id1} ->
                    NK1=1,
                    gestionar(NK1,K2);
        {terminado,trabajo2,Id2} ->
                    NK2=1,
                    gestionar(K1,NK2);
        listo -> io:format("Sí~n",[]),
                  gestionar(K1,K2);
            alto -> io:format("Proceso gestor terminado~n",[])
        end.

%Por hacer o concluir:
% a) La idea de separar este archivo en varios otros, cada uno abarcando la
% funcionalidad de un agente específico.
% b) La posibilidad de que los archivos separados sean trabajados por programadores
% diferentes, cada programador concentrándose en elaborar o un agente o inclusive
% partes de un agente (comunicación, --infrarrojo [código de barras], actuadores, etc.).
% jabber
% c) La conceptualización de que toda comunicación entre agentes remotamente ubicados
% entre sí es mediante paso de mensajes (incluso cuando es un mismo robot interno).
%
%Falta una función de parte de los trabajadores para darse conocer con id.


viernes, 2 de diciembre de 2016

Map, filter y foldr en Erlang (funciones de alto orden)

Las funciones de alto orden son una herramienta común en la programación
funcional, con características particulares ausentes en lenguajes imperativos
tradicionales tales como Java o Python, los  cuales, no obstante, han intentado
"importar" estas funciones por lo útil y concisas que son.

Ejemplo:

2> lists:map(fun(X) -> X+2 end,lists:seq(1,10)). 
[3,4,5,6,7,8,9,10,11,12]

Explicación:
lists es un módulo estándard de Erlang, que se importa automáticamente
a su sola mención. Este método de invocar funciones dentro de bibliotecas
mediante el sufijo del módulo donde se definen se llama cualificación. Es
común en varios lenguajes (tales como Python) por su utilidad para "ocultar"
nombres de funciones que de otra manera podrían ocasionar conflictos
con algunos ya existentes o de parte del sistema o provistos por el usuario.
fun(X) -> X+2
es un ejemplo de una función lambda; el curioso nombre viene de un formalismo
básico computacional llamado cálculo lambda, el cual es un poco abstruso
pero en definitiva la base de lenguajes bien conocidos como Lisp o ML, y de
menos conocidos como Haskell o Scheme (a nivel general, no ya digamos en
México).
lists:map es una función de alto orden que reparte la aplicación de la función
de su primer argumento a cada uno de los elementos de la lista que es su
segundo argumento. Esperanzadoramente, el ejemplo ilustra bien la idea.
 Finalmente, lists:seq(1,10) crea un segmento de números naturales
representados como una lista del 1 al 10.

Ejemplo:
 3> lists:filter(fun(X) -> X<5 end, [3,4,1,7,8,9,10]).
[3,4,1]
Explicación:
lists:filter ahora toma un predicado (una función que devuelve verdadero
o falso) como primer argumento y seleccion algunos de los elementos de
una lista en su segundo argumento para "sobrevivir": solo aquellos que
satisfacen al predicado.

Ejemplo:
5> lists:foldr(fun(X,Y) -> X+Y end,0,lists:seq(1,10)).
55
Explicación:
Más complicado que los previos ejemplos, por construir.

Continuación de la explicación de foldr...
Primero, la parte fun(X,Y) -> X+Y end define una función lambda, también
llamada anónima. Las funciones lambda no tienen un nombre asignado, y
surgieron originalmente de los fundamentos del cálculo lambda (un formalismo
matemático para precisar la idea de computabilidad). La parte de 0 es la
de comienzo del operador; la parte de lists:seq(1,10) nos da una lista de 10
elementos. Matemáticamente, tendríamos que
lists:foldr(fun(X,Y) -> X+Y end,0, lists:seq(1,5))
sería la expresión
(1+(2+(3+(4+(5+0)))))
o sea, 15, y ahora se puede ver qué valor sería para la expresión dada
en 5>, previamente (sería la sumatoria desde 1 hasta 10, o sea, 55...).

Todas estas funciones más la construcción de listas compactas (ejemplo:
[X || X <- lists:seq(1,10)] ) originan una programación rápida, sin efectos
laterales, y que además se presta a tratamiento algebraico. Por ejemplo,
map (f (map g lista))= map (f o g) lista
evitando un doble recorrido en la lista, en donde f o g es la función
compuesta de f y g.

jueves, 1 de diciembre de 2016

Leer archivo archivo linea por linea de manera concurrente

Usaré este espacio para intentar codificar la lectura de un archivo linea por línea de manera concurrente, lo cual es de un gran interés para mi. Actualmente intento descifrar lo que hace un código que encontré en internet, pero yo creo que empezaré por lo básico, que es hacer los ejercicios que el Dr. Manuel ha propuesto en el blog.

Nota: Estoy leyendo un archivo de 1.5 GB y otro de 2.2, las lecturas no han tomado más de 12 segundos en una máquina con un procesador de 4 nucleos

------------------Inicio de código lectura concurrente de un archivo -----------------

-module(file_pread).
-compile([native]).

-export([start/2]).

-include_lib("kernel/include/file.hrl").

start(FileName, ProcNum) ->
    [start(FileName, ProcNum, Fun) || Fun <- [fun read_file/3, fun pread_file/3]].


start(FileName, ProcNum, Fun) ->
    Start = now(),  

    Main = self(),
    Collector = spawn(fun () -> collect_loop(Main) end),

    Fun(FileName, ProcNum, Collector),
    
    %% don't terminate, wait here, until all tasks done.
    receive
        stop -> io:format("time: ~10.2f ms~n", [timer:now_diff(now(), Start) / 1000]) 
    end.

collect_loop(Main) -> collect_loop_1(Main, undefined, 0).
collect_loop_1(Main, ChunkNum, ChunkNum) -> 
    Main ! stop;
collect_loop_1(Main, ChunkNum, ProcessedNum) ->
    receive
        {chunk_num, ChunkNumX} ->
            collect_loop_1(Main, ChunkNumX, ProcessedNum);
        {seq, _Seq} ->
            collect_loop_1(Main, ChunkNum, ProcessedNum + 1)
    end.

get_chunk_size(FileName, ProcNum) ->
    {ok, #file_info{size=Size}} = file:read_file_info(FileName),
    Size div ProcNum.

read_file(FileName, ProcNum, Collector) ->
    ChunkSize = get_chunk_size(FileName, ProcNum),
    {ok, File} = file:open(FileName, [raw, binary]),
    read_file_1(File, ChunkSize, 0, Collector).
    
read_file_1(File, ChunkSize, I, Collector) ->
    case file:read(File, ChunkSize) of
        eof ->
            file:close(File),
            Collector ! {chunk_num, I};
        {ok, _Bin} -> 
            Collector ! {seq, I},
            read_file_1(File, ChunkSize, I + 1, Collector)
    end.


pread_file(FileName, ProcNum, Collector) ->
    ChunkSize = get_chunk_size(FileName, ProcNum),
    pread_file_1(FileName, ChunkSize, ProcNum, Collector).
       
pread_file_1(FileName, ChunkSize, ProcNum, Collector) ->
    [spawn(fun () ->
                   %% if it's the lastest chuck, read all bytes left, 
                   %% which will not exceed ChunkSize * 2
                   Length = if  I == ProcNum - 1 -> ChunkSize * 2;
                                true -> ChunkSize end,
                   {ok, File} = file:open(FileName, [read, binary]),
                   {ok, _Bin} = file:pread(File, ChunkSize * I, Length),
                   Collector ! {seq, I},
                   file:close(File)
           end) || I <- lists:seq(0, ProcNum - 1)],
    Collector ! {chunk_num, ProcNum}.

Salidas:

8> file_pread:start("informacion_seccion_16_0014871946_0014871947_01.txt",2).
time:    3796.00 ms
time:    7313.00 ms
[ok,ok]
9> file_pread:start("informacion_seccion_16_0014871946_0014871947_01.txt",2).
time:    2937.00 ms
time:    3594.00 ms
[ok,ok]
10> file_pread:start("informacion_seccion_16_0014871946_0014871947_01.txt",4).
time:    2391.00 ms
time:    1906.00 ms
[ok,ok]
11> file_pread:start("informacion_seccion_16_0014871946_0014871947_01.txt",4).
time:    2110.00 ms
time:    2328.00 ms
[ok,ok]

El segundo parámetro por lo que entiendo es el número de procesadores que se quiere usar para procesar el archivo

------------------Fin de código lectura concurrente de un archivo -----------------