Nota: Estoy leyendo un archivo de 1.5 GB y otro de 2.2, las lecturas no han tomado más de 12 segundos en una máquina con un procesador de 4 nucleos
------------------Inicio de código lectura concurrente de un archivo -----------------
-module(file_pread).
-compile([native]). -export([start/2]). -include_lib("kernel/include/file.hrl"). start(FileName, ProcNum) -> [start(FileName, ProcNum, Fun) || Fun <- [fun read_file/3, fun pread_file/3]]. start(FileName, ProcNum, Fun) -> Start = now(), Main = self(), Collector = spawn(fun () -> collect_loop(Main) end), Fun(FileName, ProcNum, Collector), %% don't terminate, wait here, until all tasks done. receive stop -> io:format("time: ~10.2f ms~n", [timer:now_diff(now(), Start) / 1000]) end. collect_loop(Main) -> collect_loop_1(Main, undefined, 0). collect_loop_1(Main, ChunkNum, ChunkNum) -> Main ! stop; collect_loop_1(Main, ChunkNum, ProcessedNum) -> receive {chunk_num, ChunkNumX} -> collect_loop_1(Main, ChunkNumX, ProcessedNum); {seq, _Seq} -> collect_loop_1(Main, ChunkNum, ProcessedNum + 1) end. get_chunk_size(FileName, ProcNum) -> {ok, #file_info{size=Size}} = file:read_file_info(FileName), Size div ProcNum. read_file(FileName, ProcNum, Collector) -> ChunkSize = get_chunk_size(FileName, ProcNum), {ok, File} = file:open(FileName, [raw, binary]), read_file_1(File, ChunkSize, 0, Collector). read_file_1(File, ChunkSize, I, Collector) -> case file:read(File, ChunkSize) of eof -> file:close(File), Collector ! {chunk_num, I}; {ok, _Bin} -> Collector ! {seq, I}, read_file_1(File, ChunkSize, I + 1, Collector) end. pread_file(FileName, ProcNum, Collector) -> ChunkSize = get_chunk_size(FileName, ProcNum), pread_file_1(FileName, ChunkSize, ProcNum, Collector). pread_file_1(FileName, ChunkSize, ProcNum, Collector) -> [spawn(fun () -> %% if it's the lastest chuck, read all bytes left, %% which will not exceed ChunkSize * 2 Length = if I == ProcNum - 1 -> ChunkSize * 2; true -> ChunkSize end, {ok, File} = file:open(FileName, [read, binary]), {ok, _Bin} = file:pread(File, ChunkSize * I, Length), Collector ! {seq, I}, file:close(File) end) || I <- lists:seq(0, ProcNum - 1)], Collector ! {chunk_num, ProcNum}.
Salidas:
8> file_pread:start("informacion_seccion_16_0014871946_0014871947_01.txt",2). time: 3796.00 ms time: 7313.00 ms [ok,ok] 9> file_pread:start("informacion_seccion_16_0014871946_0014871947_01.txt",2). time: 2937.00 ms time: 3594.00 ms [ok,ok] 10> file_pread:start("informacion_seccion_16_0014871946_0014871947_01.txt",4). time: 2391.00 ms time: 1906.00 ms [ok,ok] 11> file_pread:start("informacion_seccion_16_0014871946_0014871947_01.txt",4). time: 2110.00 ms time: 2328.00 ms [ok,ok]
El segundo parámetro por lo que entiendo es el número de procesadores que se quiere usar para procesar el archivo
------------------Fin de código lectura concurrente de un archivo -----------------
Interesante. Así que ésta sería una aplicación posible a
ResponderEliminarminería de datos, donde tienen que leerse archivos de
tamaño enorme, no? Saludos
Si, puede aplicarse a la minería de datos, pero en lo que estoy interesado es en aplicarle validaciones de estructura (que la longitud de campo sea correcta, que el tipo de dato sea válido, en caso de ser numérico que sea positivo o negativo, etc). Existe una validación que de verdad me pone a pensar, en la cual involucra saber que ninguno de los registros sean duplicados (lo que pienso que se puede hacer es obtener el hash de cada renglon, y la posición o línea, una vez que se tienen cargadas la columna renglón y la columna hash en memoria, ir a comparar sólo los hash que estén repetidos para saber si efectivamente el renglón se repite).
ResponderEliminarMuchas gracias a los que estén leyendo estas líneas.