jueves, 1 de diciembre de 2016

Leer archivo archivo linea por linea de manera concurrente

Usaré este espacio para intentar codificar la lectura de un archivo linea por línea de manera concurrente, lo cual es de un gran interés para mi. Actualmente intento descifrar lo que hace un código que encontré en internet, pero yo creo que empezaré por lo básico, que es hacer los ejercicios que el Dr. Manuel ha propuesto en el blog.

Nota: Estoy leyendo un archivo de 1.5 GB y otro de 2.2, las lecturas no han tomado más de 12 segundos en una máquina con un procesador de 4 nucleos

------------------Inicio de código lectura concurrente de un archivo -----------------

-module(file_pread).
-compile([native]).

-export([start/2]).

-include_lib("kernel/include/file.hrl").

start(FileName, ProcNum) ->
    [start(FileName, ProcNum, Fun) || Fun <- [fun read_file/3, fun pread_file/3]].


start(FileName, ProcNum, Fun) ->
    Start = now(),  

    Main = self(),
    Collector = spawn(fun () -> collect_loop(Main) end),

    Fun(FileName, ProcNum, Collector),
    
    %% don't terminate, wait here, until all tasks done.
    receive
        stop -> io:format("time: ~10.2f ms~n", [timer:now_diff(now(), Start) / 1000]) 
    end.

collect_loop(Main) -> collect_loop_1(Main, undefined, 0).
collect_loop_1(Main, ChunkNum, ChunkNum) -> 
    Main ! stop;
collect_loop_1(Main, ChunkNum, ProcessedNum) ->
    receive
        {chunk_num, ChunkNumX} ->
            collect_loop_1(Main, ChunkNumX, ProcessedNum);
        {seq, _Seq} ->
            collect_loop_1(Main, ChunkNum, ProcessedNum + 1)
    end.

get_chunk_size(FileName, ProcNum) ->
    {ok, #file_info{size=Size}} = file:read_file_info(FileName),
    Size div ProcNum.

read_file(FileName, ProcNum, Collector) ->
    ChunkSize = get_chunk_size(FileName, ProcNum),
    {ok, File} = file:open(FileName, [raw, binary]),
    read_file_1(File, ChunkSize, 0, Collector).
    
read_file_1(File, ChunkSize, I, Collector) ->
    case file:read(File, ChunkSize) of
        eof ->
            file:close(File),
            Collector ! {chunk_num, I};
        {ok, _Bin} -> 
            Collector ! {seq, I},
            read_file_1(File, ChunkSize, I + 1, Collector)
    end.


pread_file(FileName, ProcNum, Collector) ->
    ChunkSize = get_chunk_size(FileName, ProcNum),
    pread_file_1(FileName, ChunkSize, ProcNum, Collector).
       
pread_file_1(FileName, ChunkSize, ProcNum, Collector) ->
    [spawn(fun () ->
                   %% if it's the lastest chuck, read all bytes left, 
                   %% which will not exceed ChunkSize * 2
                   Length = if  I == ProcNum - 1 -> ChunkSize * 2;
                                true -> ChunkSize end,
                   {ok, File} = file:open(FileName, [read, binary]),
                   {ok, _Bin} = file:pread(File, ChunkSize * I, Length),
                   Collector ! {seq, I},
                   file:close(File)
           end) || I <- lists:seq(0, ProcNum - 1)],
    Collector ! {chunk_num, ProcNum}.

Salidas:

8> file_pread:start("informacion_seccion_16_0014871946_0014871947_01.txt",2).
time:    3796.00 ms
time:    7313.00 ms
[ok,ok]
9> file_pread:start("informacion_seccion_16_0014871946_0014871947_01.txt",2).
time:    2937.00 ms
time:    3594.00 ms
[ok,ok]
10> file_pread:start("informacion_seccion_16_0014871946_0014871947_01.txt",4).
time:    2391.00 ms
time:    1906.00 ms
[ok,ok]
11> file_pread:start("informacion_seccion_16_0014871946_0014871947_01.txt",4).
time:    2110.00 ms
time:    2328.00 ms
[ok,ok]

El segundo parámetro por lo que entiendo es el número de procesadores que se quiere usar para procesar el archivo

------------------Fin de código lectura concurrente de un archivo -----------------

2 comentarios:

  1. Interesante. Así que ésta sería una aplicación posible a
    minería de datos, donde tienen que leerse archivos de
    tamaño enorme, no? Saludos

    ResponderEliminar
  2. Si, puede aplicarse a la minería de datos, pero en lo que estoy interesado es en aplicarle validaciones de estructura (que la longitud de campo sea correcta, que el tipo de dato sea válido, en caso de ser numérico que sea positivo o negativo, etc). Existe una validación que de verdad me pone a pensar, en la cual involucra saber que ninguno de los registros sean duplicados (lo que pienso que se puede hacer es obtener el hash de cada renglon, y la posición o línea, una vez que se tienen cargadas la columna renglón y la columna hash en memoria, ir a comparar sólo los hash que estén repetidos para saber si efectivamente el renglón se repite).

    Muchas gracias a los que estén leyendo estas líneas.

    ResponderEliminar