From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.1.3 (2006-06-01) on yquem.inria.fr X-Spam-Level: X-Spam-Status: No, score=0.0 required=5.0 tests=none autolearn=disabled version=3.1.3 Received: from discorde.inria.fr (discorde.inria.fr [192.93.2.38]) by yquem.inria.fr (Postfix) with ESMTP id 22C7DBC69 for ; Wed, 30 May 2007 11:52:58 +0200 (CEST) Received: from moutng.kundenserver.de (moutng.kundenserver.de [212.227.126.188]) by discorde.inria.fr (8.13.6/8.13.6) with ESMTP id l4U9qvBW016919 for ; Wed, 30 May 2007 11:52:57 +0200 Received: from [141.84.136.30] (helo=[152.78.96.56]) by mrelayeu.kundenserver.de (node=mrelayeu4) with ESMTP (Nemesis), id 0ML21M-1HtKrQ0Mt4-0000yJ; Wed, 30 May 2007 11:52:56 +0200 Message-ID: <465D4978.8010904@functionality.de> Date: Wed, 30 May 2007 10:52:56 +0100 From: Thomas Fischbacher User-Agent: Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.7.12) Gecko/20060607 Debian/1.7.12-1.2 X-Accept-Language: en MIME-Version: 1.0 To: Jon Harrop Cc: caml-list@inria.fr Subject: Re: [Caml-list] Faking concurrency using Unix forks and pipes References: <200705300442.59906.jon@ffconsultancy.com> In-Reply-To: <200705300442.59906.jon@ffconsultancy.com> Content-Type: text/plain; charset=us-ascii; format=flowed Content-Transfer-Encoding: 7bit X-Provags-ID: V01U2FsdGVkX18icDDA+Qi40ePB8wPQxVZA+rCX+36hwFHsn2c nliVc7oTS0aw1jWKfoh4Hn03J4HLhClbFBdlthPpMFqs3lCsml puL+lGlMVHf5RIDBCiNgw== X-Miltered: at discorde with ID 465D4979.002 by Joe's j-chkmail (http://j-chkmail . ensmp . fr)! X-Spam: no; 0.00; forks:01 ocaml:01 forks:01 marshalling:01 ocaml:01 workload:01 printf:01 fprintf:01 stderr:01 fine-grained:01 scheduler:01 overkill:01 contrib:01 waitpid:01 contrib:01 Jon Harrop wrote: > Has anyone implemented a parallel map function in OCaml using Unix forks, > pipes and maybe marshalling? > > This seems like an easy way to get concurrency in OCaml... That is indeed an exercise I like to pose to my PhD students. (Of course, the question whether this really makes that much sense is a different issue...) Here is my own suggestion how to do it: let compute_uniform_workload_forked ?(bailout= (fun str -> let () = Printf.fprintf stderr "AIEE! %s\n%!" str in exit 1)) ~fun_combine v_work = let bailout s dummy = let _ = bailout s in dummy in (* Note that we use the "bailout" function in two different places where it expects different return types. Hence, we have to bend over backwards to get the type system to accept what we actually want to do... *) let nr_processes = Array.length v_work in let rec setup_childs nr_process child_info = if nr_process = nr_processes then List.rev child_info (* This ensures we get the data in proper order. *) else let (fd_read,fd_write) = Unix.socketpair Unix.PF_UNIX Unix.SOCK_STREAM 0 in let pid = Unix.fork () in if pid == (-1) (* fork failure *) then bailout "fork() failure!" child_info else if pid == 0 (* We are the child - compute our share and exit *) then let () = Unix.close fd_read in let s_write = Unix.out_channel_of_descr fd_write in let result = v_work.(nr_process) () in let () = Marshal.to_channel s_write result [] in exit 0 else (* We are the parent *) let () = Unix.close fd_write in let s_read = Unix.in_channel_of_descr fd_read in setup_childs (1+nr_process) ((s_read,pid)::child_info) in let all_childs_info = setup_childs 1 [] in (* Note that it is important that we start counting at 1 here, as the parent will do chunk #0! *) let result_chunk0 = v_work.(0) () in (* Note that we just do assume that all pieces of the computation take the same time. We are not trying to be overly sophisticated, fetching data from the fastest child first. Also, if we wanted a more powerful tool to compute with forked processes, we might want to divide the big task in a more fine-grained way and hand out sub-tasks to processes through a scheduler that takes care of when which process finishes which sub-task. For now, this is overkill. *) let rec collect_child_results have child_info_todo = match child_info_todo with | [] -> have | ((s_read,pid)::child_info_todo_next) -> let contrib = Marshal.from_channel s_read in let (returned_pid,status) = Unix.waitpid [] pid in if status <> Unix.WEXITED 0 then bailout "Child failure!\n%!" have else collect_child_results (fun_combine contrib have) child_info_todo_next in collect_child_results result_chunk0 all_childs_info ;; (* --- (* === Example === *) let sum_of_inverse_squares = compute_uniform_workload_forked ~fun_combine:(fun a b -> a+.b) (let nr_processes=4 in let ranges=split_range nr_processes 1 100000 in let work subrange_start subrange_end = let () = Printf.printf "PID: %d SUB-RANGE %d - %d\n%!" (Unix.getpid()) subrange_start subrange_end in let rec walk n sum = if n = subrange_end then sum else walk (1+n) (let fn = float_of_int n in sum +. 1.0/.(fn*.fn)) in walk subrange_start 0.0 in (Array.init nr_processes (fun n -> let (r_s,r_e) = ranges.(n) in fun () -> work r_s r_e))) ;; (* This gives: 1.64492406679822967 The full sum would be pi^2/6 = 1.64493406684822641 *) --- *) -- best regards, Thomas Fischbacher tf@functionality.de