diff -uNr /tmp/ocaml-3.08.1/byterun/backtrace.c ocaml-3.08.1/byterun/backtrace.c --- /tmp/ocaml-3.08.1/byterun/backtrace.c 2004-01-02 11:23:19.000000000 -0800 +++ ocaml-3.08.1/byterun/backtrace.c 2008-04-08 15:11:16.531426149 -0700 @@ -38,7 +38,6 @@ CAMLexport int caml_backtrace_pos = 0; CAMLexport code_t * caml_backtrace_buffer = NULL; CAMLexport value caml_backtrace_last_exn = Val_unit; -#define BACKTRACE_BUFFER_SIZE 1024 /* Location of fields in the Instruct.debug_event record */ enum { EV_POS = 0, diff -uNr /tmp/ocaml-3.08.1/byterun/backtrace.h ocaml-3.08.1/byterun/backtrace.h --- /tmp/ocaml-3.08.1/byterun/backtrace.h 2003-12-31 06:20:35.000000000 -0800 +++ ocaml-3.08.1/byterun/backtrace.h 2008-04-08 15:11:13.199062568 -0700 @@ -18,6 +18,8 @@ #include "mlvalues.h" +#define BACKTRACE_BUFFER_SIZE 1024 + CAMLextern int caml_backtrace_active; CAMLextern int caml_backtrace_pos; CAMLextern code_t * caml_backtrace_buffer; diff -uNr /tmp/ocaml-3.08.1/byterun/interp.c ocaml-3.08.1/byterun/interp.c --- /tmp/ocaml-3.08.1/byterun/interp.c 2004-06-12 03:40:52.000000000 -0700 +++ ocaml-3.08.1/byterun/interp.c 2008-04-09 14:12:07.527970931 -0700 @@ -31,6 +31,7 @@ #include "prims.h" #include "signals.h" #include "stacks.h" +#include "tecaml.h" /* Registers for the abstract machine: pc the code pointer @@ -58,7 +59,10 @@ # ifdef __ia64__ # define Next goto *(void *)(jumptbl_base + *((uint32 *) pc)++) # else -# define Next goto *(void *)(jumptbl_base + *pc++) +# define Next \ + { \ + goto *(void *)(jumptbl_base + *pc++);\ + } # endif # endif #else @@ -71,7 +75,12 @@ #define Setup_for_gc \ { sp -= 2; sp[0] = accu; sp[1] = env; caml_extern_sp = sp; } #define Restore_after_gc { accu = sp[0]; env = sp[1]; sp += 2; } -#define Setup_for_c_call { saved_pc = pc; *--sp = env; caml_extern_sp = sp; } + +#define Setup_for_c_call \ + { saved_pc = pc; \ + *--sp = env; \ + caml_extern_sp = sp; } + #define Restore_after_c_call { sp = caml_extern_sp; env = *sp++; } /* An event frame must look like accu + a C_CALL frame + a RETURN 1 frame */ @@ -188,6 +197,42 @@ static long caml_bcodcount; #endif +/* read and write logging for first class heaps -mkehrt */ + +#define ReadField(dest, ptr, offset) \ + { \ + if(log_heap) \ + { \ + log_heap_off(Val_unit); \ + Setup_for_c_call; \ + (dest) = caml_callback2(*caml_named_value("ml_heap_read_field"), \ + (value)(ptr), \ + Val_int(offset)); \ + Restore_after_c_call; \ + log_heap_on(Val_unit); \ + } \ + else \ + (dest) = Field((ptr),(offset)); \ + } \ + + +#define WriteField(ptr, offset, val) \ + { \ + if(log_heap) \ + { \ + log_heap_off(Val_unit); \ + Setup_for_c_call; \ + caml_callback3(*caml_named_value("ml_heap_write_field"), \ + (value)(ptr), \ + Val_int(offset), \ + (val)); \ + Restore_after_c_call; \ + log_heap_on(Val_unit); \ + } \ + else \ + Modify(&Field((ptr),(offset)), (val)); \ + } \ + /* The interpreter itself */ value caml_interprete(code_t prog, asize_t prog_size) @@ -217,7 +262,7 @@ struct caml__roots_block * volatile initial_local_roots; volatile code_t saved_pc; struct longjmp_buffer raise_buf; - value * modify_dest, modify_newval; + value * modify_dest, modify_newval; //XXX #ifndef THREADED_CODE opcode_t curr_instr; #endif @@ -245,6 +290,8 @@ caml_callback_depth++; saved_pc = NULL; + log_heap = 0; + if (sigsetjmp(raise_buf.buf, 0)) { caml_local_roots = initial_local_roots; sp = caml_extern_sp; @@ -585,7 +632,7 @@ *--sp = accu; /* Fallthrough */ Instruct(GETGLOBAL): - accu = Field(caml_global_data, *pc); + ReadField(accu, caml_global_data, *pc); pc++; Next; @@ -593,7 +640,7 @@ *--sp = accu; /* Fallthrough */ Instruct(GETGLOBALFIELD): { - accu = Field(caml_global_data, *pc); + ReadField(accu, caml_global_data, *pc); pc++; accu = Field(accu, *pc); pc++; @@ -601,7 +648,7 @@ } Instruct(SETGLOBAL): - caml_modify(&Field(caml_global_data, *pc), accu); + WriteField(caml_global_data, *pc, accu); accu = Val_unit; pc++; Next; @@ -687,49 +734,72 @@ /* Access to components of blocks */ Instruct(GETFIELD0): - accu = Field(accu, 0); Next; + ReadField(accu, accu, 0); Next; Instruct(GETFIELD1): - accu = Field(accu, 1); Next; + ReadField(accu, accu, 1); Next; Instruct(GETFIELD2): - accu = Field(accu, 2); Next; + ReadField(accu, accu, 2); Next; Instruct(GETFIELD3): - accu = Field(accu, 3); Next; + ReadField(accu, accu, 3); Next; Instruct(GETFIELD): - accu = Field(accu, *pc); pc++; Next; + ReadField(accu, accu, *pc); pc++; Next; Instruct(GETFLOATFIELD): { - double d = Double_field(accu, *pc); Alloc_small(accu, Double_wosize, Double_tag); - Store_double_val(accu, d); + if(log_heap) + { + value buff[2]; + ReadField(buff[0], pc, 0); + ReadField(buff[1], pc, 1); + Store_double_val(accu, (double)(*buff)); + } + else + { + double d = Double_field(accu, *pc); + Store_double_val(accu, d); + } pc++; Next; } Instruct(SETFIELD0): - modify_dest = &Field(accu, 0); modify_newval = *sp++; - modify: - Modify(modify_dest, modify_newval); + WriteField(accu, 0, modify_newval); accu = Val_unit; Next; Instruct(SETFIELD1): - modify_dest = &Field(accu, 1); modify_newval = *sp++; - goto modify; + WriteField(accu, 1, modify_newval); + accu = Val_unit; + Next; Instruct(SETFIELD2): - modify_dest = &Field(accu, 2); modify_newval = *sp++; - goto modify; + WriteField(accu, 2, modify_newval); + accu = Val_unit; + Next; Instruct(SETFIELD3): - modify_dest = &Field(accu, 3); modify_newval = *sp++; - goto modify; + WriteField(accu, 3, modify_newval); + accu = Val_unit; + Next; Instruct(SETFIELD): - modify_dest = &Field(accu, *pc); - pc++; modify_newval = *sp++; - goto modify; + WriteField(accu, *pc, modify_newval); + pc++; + accu = Val_unit; + Next; + Instruct(SETFLOATFIELD): - Store_double_field(accu, *pc, Double_val(*sp)); + //This will be problematic on architectures with aligned doubles. + if(log_heap) + { + value* d = (value*)(*sp); + value* f1 = (value*)accu + (*pc * 2); + value* f2 = (value*)accu + (*pc * 2 + 1); + WriteField(f1, 0, d[1]); + WriteField(f2, 0, d[2]); + } + else + Store_double_field(accu, *pc, Double_val(*sp)); accu = Val_unit; sp++; pc++; @@ -744,14 +814,14 @@ Next; } Instruct(GETVECTITEM): - accu = Field(accu, Long_val(sp[0])); + ReadField(accu, accu, Long_val(sp[0])); sp += 1; Next; Instruct(SETVECTITEM): - modify_dest = &Field(accu, Long_val(sp[0])); - modify_newval = sp[1]; + WriteField(accu, Long_val(sp[0]), sp[1]); sp += 2; - goto modify; + accu = Val_unit; + Next; /* String operations */ @@ -1145,4 +1215,18 @@ Assert(prog_size>0); } +/* Turn heap logging on. */ +value log_heap_on(value ignore) +{ + log_heap = 1; + return Val_unit; +} + +/* Turn heap logging off. */ +value log_heap_off(value ignore) +{ + log_heap = 0; + return Val_unit; +} + /* eof $Id: interp.c,v 1.90 2004/06/12 10:40:52 xleroy Exp $ */ diff -uNr /tmp/ocaml-3.08.1/byterun/tecaml.h ocaml-3.08.1/byterun/tecaml.h --- /tmp/ocaml-3.08.1/byterun/tecaml.h 1969-12-31 16:00:00.000000000 -0800 +++ ocaml-3.08.1/byterun/tecaml.h 2008-04-08 15:06:33.992599463 -0700 @@ -0,0 +1,14 @@ +/* File by Matthew Kehrt + * TE Caml declarations that don't go elsewhere */ + +#ifndef _TECAML_H_ +#define _TECAML_H_ + +/* Turn heap logging on and off. */ +value log_heap_on(value after); +value log_heap_off(value after); + +/* Is heap logging on? */ +int log_heap; + +#endif /* _TE_CAML_H_ */ diff -uNr /tmp/ocaml-3.08.1/otherlibs/threads/condition.ml ocaml-3.08.1/otherlibs/threads/condition.ml --- /tmp/ocaml-3.08.1/otherlibs/threads/condition.ml 2001-12-07 05:40:21.000000000 -0800 +++ ocaml-3.08.1/otherlibs/threads/condition.ml 2008-04-08 15:07:04.263902237 -0700 @@ -22,15 +22,19 @@ Mutex.unlock mut; cond.waiting <- Thread.self() :: cond.waiting; Thread.sleep(); - Mutex.lock mut + Mutex.lock mut; + flush stdout let signal cond = match cond.waiting with (* atomic *) [] -> () - | th :: rem -> cond.waiting <- rem (* atomic *); Thread.wakeup th + | th :: rem -> cond.waiting <- rem; (* atomic *) + try Thread.wakeup th with _ -> () let broadcast cond = let w = cond.waiting in (* atomic *) cond.waiting <- []; (* atomic *) - List.iter Thread.wakeup w + List.iter + (fun t -> try Thread.wakeup t with _ -> ()) + w diff -uNr /tmp/ocaml-3.08.1/otherlibs/threads/.depend ocaml-3.08.1/otherlibs/threads/.depend --- /tmp/ocaml-3.08.1/otherlibs/threads/.depend 2003-12-15 10:10:51.000000000 -0800 +++ ocaml-3.08.1/otherlibs/threads/.depend 2008-04-08 15:07:55.237463748 -0700 @@ -1,28 +1,32 @@ scheduler.o: scheduler.c ../../byterun/alloc.h \ ../../byterun/compatibility.h ../../byterun/misc.h \ - ../../byterun/config.h ../../config/m.h ../../config/s.h \ - ../../byterun/mlvalues.h ../../byterun/backtrace.h \ - ../../byterun/callback.h ../../byterun/fail.h ../../byterun/io.h \ + ../../byterun/config.h ../../byterun/../config/m.h \ + ../../byterun/../config/s.h ../../byterun/mlvalues.h \ + ../../byterun/backtrace.h ../../byterun/callback.h \ + ../../byterun/config.h ../../byterun/fail.h ../../byterun/io.h \ ../../byterun/memory.h ../../byterun/gc.h ../../byterun/major_gc.h \ - ../../byterun/freelist.h ../../byterun/minor_gc.h \ - ../../byterun/printexc.h ../../byterun/roots.h ../../byterun/signals.h \ - ../../byterun/stacks.h ../../byterun/sys.h + ../../byterun/freelist.h ../../byterun/minor_gc.h ../../byterun/misc.h \ + ../../byterun/mlvalues.h ../../byterun/printexc.h ../../byterun/roots.h \ + ../../byterun/memory.h ../../byterun/signals.h ../../byterun/stacks.h \ + ../../byterun/sys.h ../../byterun/tecaml.h condition.cmi: mutex.cmi -thread.cmi: unix.cmi -threadUnix.cmi: unix.cmi +thread.cmi: unix.cmo +threadUnix.cmi: unix.cmo condition.cmo: mutex.cmi thread.cmi condition.cmi condition.cmx: mutex.cmx thread.cmx condition.cmi event.cmo: condition.cmi mutex.cmi event.cmi event.cmx: condition.cmx mutex.cmx event.cmi -marshal.cmo: pervasives.cmi marshal.cmi -marshal.cmx: pervasives.cmx marshal.cmi +marshal.cmo: pervasives.cmo +marshal.cmx: pervasives.cmx mutex.cmo: thread.cmi mutex.cmi mutex.cmx: thread.cmx mutex.cmi -pervasives.cmo: unix.cmi pervasives.cmi -pervasives.cmx: unix.cmx pervasives.cmi -thread.cmo: unix.cmi thread.cmi +pervasives.cmo: unix.cmo +pervasives.cmx: unix.cmx +thread.cmo: unix.cmo thread.cmi thread.cmx: unix.cmx thread.cmi -threadUnix.cmo: thread.cmi unix.cmi threadUnix.cmi +threadUnix.cmo: thread.cmi unix.cmo threadUnix.cmi threadUnix.cmx: thread.cmx unix.cmx threadUnix.cmi -unix.cmo: unix.cmi -unix.cmx: unix.cmi +txevent.cmo: condition.cmi mutex.cmi thread.cmi unique.cmi txevent.cmi +txevent.cmx: condition.cmx mutex.cmx thread.cmx unique.cmx txevent.cmi +unique.cmo: mutex.cmi unique.cmi +unique.cmx: mutex.cmx unique.cmi diff -uNr /tmp/ocaml-3.08.1/otherlibs/threads/Makefile ocaml-3.08.1/otherlibs/threads/Makefile --- /tmp/ocaml-3.08.1/otherlibs/threads/Makefile 2003-07-17 01:38:28.000000000 -0700 +++ ocaml-3.08.1/otherlibs/threads/Makefile 2008-04-08 15:07:46.868550651 -0700 @@ -23,7 +23,7 @@ C_OBJS=scheduler.o -CAML_OBJS=thread.cmo mutex.cmo condition.cmo event.cmo threadUnix.cmo +CAML_OBJS=thread.cmo mutex.cmo condition.cmo event.cmo unique.cmo txevent.cmo threadUnix.cmo LIB=../../stdlib @@ -103,8 +103,8 @@ mkdir -p $(LIBDIR)/vmthreads cp libvmthreads.a $(LIBDIR)/vmthreads/libvmthreads.a cd $(LIBDIR)/vmthreads; $(RANLIB) libvmthreads.a - cp thread.cmi mutex.cmi condition.cmi event.cmi threadUnix.cmi threads.cma stdlib.cma unix.cma $(LIBDIR)/vmthreads - cp thread.mli mutex.mli condition.mli event.mli threadUnix.mli $(LIBDIR)/vmthreads + cp thread.cmi mutex.cmi condition.cmi event.cmi unique.cmi txevent.cmi threadUnix.cmi threads.cma stdlib.cma unix.cma $(LIBDIR)/vmthreads + cp thread.mli mutex.mli condition.mli event.mli unique.mli txevent.mli threadUnix.mli $(LIBDIR)/vmthreads installopt: diff -uNr /tmp/ocaml-3.08.1/otherlibs/threads/mutex.ml ocaml-3.08.1/otherlibs/threads/mutex.ml --- /tmp/ocaml-3.08.1/otherlibs/threads/mutex.ml 2001-12-07 05:40:22.000000000 -0800 +++ ocaml-3.08.1/otherlibs/threads/mutex.ml 2008-04-08 15:07:14.797051465 -0700 @@ -35,5 +35,5 @@ let w = m.waiting in (* atomic *) m.waiting <- []; (* atomic *) m.locked <- false; (* atomic *) - List.iter Thread.wakeup w + List.iter (fun th -> try Thread.wakeup th with _ -> ()) w diff -uNr /tmp/ocaml-3.08.1/otherlibs/threads/scheduler.c ocaml-3.08.1/otherlibs/threads/scheduler.c --- /tmp/ocaml-3.08.1/otherlibs/threads/scheduler.c 2003-12-29 14:15:02.000000000 -0800 +++ ocaml-3.08.1/otherlibs/threads/scheduler.c 2008-04-09 14:11:28.555721367 -0700 @@ -33,6 +33,7 @@ #include "signals.h" #include "stacks.h" #include "sys.h" +#include "tecaml.h" #if ! (defined(HAS_SELECT) && \ defined(HAS_SETITIMER) && \ @@ -62,6 +63,9 @@ #define O_NONBLOCK O_NDELAY #endif +/* fwd pointer to thread_kill --mkehrt */ +value thread_kill(value thread); + /* Configuration */ /* Initial size of stack when a thread is created (4 Ko) */ @@ -69,6 +73,13 @@ /* Max computation time before rescheduling, in microseconds (50ms) */ #define Thread_timeout 50000 +#define START_MAX_TX_PREEMPTIONS 1024 +#define MAX_MAX_TX_PREEMPTIONS 1024 + +/* Keep track of how long we've been doing a search */ +int doing_preemption_counts = 0; +int max_preemptions = START_MAX_TX_PREEMPTIONS; +int preemptions_passed = 0; /* The thread descriptors */ @@ -92,6 +103,8 @@ value joining; /* Thread we're trying to join */ value waitpid; /* PID of process we're waiting for */ value retval; /* Value to return when thread resumes */ + value log_heap; /* is heap logging on for this heap? */ + value thread_type; /* search thread or program thread? */ }; typedef struct caml_thread_struct * caml_thread_t; @@ -121,6 +134,12 @@ #define DELAY_INFTY 1E30 /* +infty, for this purpose */ +/* Search thread or normal thread */ +#define PROGRAM_THREAD Val_int(0) +#define SEARCH_THREAD Val_int(1) + +value curr_thread_type; + /* The thread currently active */ static caml_thread_t curr_thread = NULL; /* Identifier for next thread creation */ @@ -187,6 +206,10 @@ /* Initialize GC */ prev_scan_roots_hook = scan_roots_hook; scan_roots_hook = thread_scan_roots; + /* set initial heap logging to off */ + curr_thread->log_heap = Val_int(log_heap); + curr_thread->thread_type = PROGRAM_THREAD; + curr_thread_type = PROGRAM_THREAD; /* Set standard file descriptors to non-blocking mode */ stdin_initial_status = fcntl(0, F_GETFL); stdout_initial_status = fcntl(1, F_GETFL); @@ -257,6 +280,8 @@ th->joining = NO_JOINING; th->waitpid = NO_WAITPID; th->retval = Val_unit; + th->log_heap = Val_int(0); + th->thread_type = PROGRAM_THREAD; /* Insert thread in doubly linked list of threads */ th->prev = curr_thread->prev; th->next = curr_thread; @@ -303,6 +328,37 @@ /* Don't allow preemption during a callback */ if (callback_depth > 1) return curr_thread->retval; + /* Check to see if we have been doing a search for a long time. + * If so, run some program threads for a while, double search + * time and try again later. If we've searched for too long, + * cancel the search. */ + if(doing_preemption_counts && + max_preemptions <= preemptions_passed) + { + preemptions_passed = 0; + if(curr_thread_type == SEARCH_THREAD) + { + fflush(stdout); + caml_callback(*caml_named_value("ml_kill_all"), (Val_unit)); + curr_thread_type = PROGRAM_THREAD; + } + else + { + max_preemptions *= 2; + if(max_preemptions > MAX_MAX_TX_PREEMPTIONS) + { + doing_preemption_counts = 0; + max_preemptions = START_MAX_TX_PREEMPTIONS; + caml_callback(*caml_named_value("ml_kill_restart_thread"), (Val_unit)); + } + else + { + caml_callback(*caml_named_value("ml_wake_restart_thread"), (Val_unit)); + curr_thread_type = SEARCH_THREAD; + } + } + } + /* Save the status of the current thread */ curr_thread->stack_low = stack_low; curr_thread->stack_high = stack_high; @@ -312,6 +368,7 @@ curr_thread->backtrace_pos = Val_int(backtrace_pos); curr_thread->backtrace_buffer = backtrace_buffer; curr_thread->backtrace_last_exn = backtrace_last_exn; + curr_thread->log_heap = Val_int(log_heap); try_again: /* Find if a thread is runnable. @@ -375,8 +432,37 @@ /* Find if a thread is runnable. */ run_thread = NULL; FOREACH_THREAD(th) - if (th->status == RUNNABLE) { run_thread = th; break; } + fflush(stdout); + if (th->status == RUNNABLE && th->thread_type == curr_thread_type) + { + run_thread = th; + break; + } END_FOREACH(th); + + /* If we've failed to find a search thread to schedule, cancel search and + * switch to program threads. + * This may screw up if search threads are blocking on I/O, but they + * really shouldn't be. */ + if(run_thread == NULL && curr_thread_type == SEARCH_THREAD) + { + doing_preemption_counts = 0; + max_preemptions = START_MAX_TX_PREEMPTIONS; + caml_callback(*caml_named_value("ml_kill_all"), (Val_unit)); + curr_thread_type = PROGRAM_THREAD; + caml_callback(*caml_named_value("ml_kill_restart_thread"), (Val_unit)); + if(curr_thread->status == KILLED) + goto try_again; + else + return thread_kill((value)curr_thread); + } + /* If the program is blocked, just run the search as long as possible. */ + else if (run_thread == NULL && curr_thread_type == PROGRAM_THREAD && + doing_preemption_counts) + { + preemptions_passed = max_preemptions ; + return(schedule_thread()); + } /* Do the select if needed */ if (need_select || run_thread == NULL) { @@ -495,6 +581,7 @@ Assign(run_thread->joining, NO_JOINING); run_thread->waitpid = NO_WAITPID; + /* Activate the thread */ curr_thread = run_thread; stack_low = curr_thread->stack_low; @@ -505,6 +592,7 @@ backtrace_pos = Int_val(curr_thread->backtrace_pos); backtrace_buffer = curr_thread->backtrace_buffer; backtrace_last_exn = curr_thread->backtrace_last_exn; + log_heap = Int_val(curr_thread->log_heap); return curr_thread->retval; } @@ -514,7 +602,9 @@ static void check_callback(void) { if (callback_depth > 1) + { caml_fatal_error("Thread: deadlock during callback"); + } } /* Reschedule without suspending the current thread */ @@ -543,6 +633,7 @@ *--extern_sp = accu; } + /* Request a re-scheduling as soon as possible */ value thread_request_reschedule(value unit) /* ML */ @@ -552,6 +643,14 @@ return Val_unit; } +/* On preemption, up preemption cound and schedule a thread */ +value thread_preempt(value unit) +{ + if(doing_preemption_counts) + ++preemptions_passed; + return thread_request_reschedule(unit); +} + /* Suspend the current thread */ value thread_sleep(value unit) /* ML */ @@ -874,3 +973,117 @@ if (stdout_initial_status != -1) fcntl(1, F_SETFL, stdout_initial_status); if (stderr_initial_status != -1) fcntl(2, F_SETFL, stderr_initial_status); } + +/* Copy a thread */ +value th_fork(value ignore) /* ML */ +{ + asize_t size; + caml_thread_t th; + value * p; + + /* I don't think putting in the root annotations is needed here */ + Begin_root(ignore); + th = (caml_thread_t) alloc_shr(sizeof(struct caml_thread_struct) + / sizeof(value), 0); + End_roots(); + + th->ident = next_ident; + next_ident = Val_int(Int_val(next_ident) + 1); + + /* Copy stack */ + size = (stack_high - stack_low) * sizeof(value); + th->stack_low = (value *) stat_alloc(size); + memcpy(th->stack_low, stack_low, size); + th->stack_high = th->stack_low + (size / sizeof(value)); + th->stack_threshold = + th->stack_low + (stack_threshold - stack_low); + th->sp = th->stack_high - (stack_high - caml_extern_sp); + + /* Update trap pointers */ + th->trapsp = th->stack_high - (curr_thread->stack_high - trapsp); + for (p = th->trapsp; p < th->stack_high; p = Trap_link(p)) + Trap_link(p) = (value *) ((char *) th->stack_high - + ((char *) curr_thread->stack_high - + (char *) Trap_link(p))); + + th->backtrace_pos = Val_int(Int_val(curr_thread->backtrace_pos)); + + /* Copy backtrace buffer */ + if (backtrace_buffer != NULL) + { + th->backtrace_buffer = malloc(BACKTRACE_BUFFER_SIZE * sizeof(code_t)); + memcpy(th->backtrace_buffer, backtrace_buffer, + BACKTRACE_BUFFER_SIZE * sizeof(code_t)); + } + else + th->backtrace_buffer = NULL; + + /* Copy some other stuff */ + th->backtrace_last_exn = curr_thread->backtrace_last_exn; + th->status = curr_thread->status; + th->fd = curr_thread->fd; + th->readfds = curr_thread->readfds; + th->writefds = curr_thread->writefds; + th->exceptfds = curr_thread->exceptfds; + th->delay = curr_thread->delay; + th->joining = curr_thread->joining; + th->waitpid = curr_thread->waitpid; + th->log_heap = Val_int(log_heap); + th->thread_type = curr_thread->thread_type; + + /* Insert the thread in the queue. */ + th->prev = curr_thread->prev; + th->next = curr_thread; + Assign(curr_thread->prev->next, th); + Assign(curr_thread->prev, th); + + /* to prevent GC weirdness. */ + th->retval = Val_unit; + /* child gets Child */ + /* May cause a GC, but I think that is ok at this point. */ + Assign(th->retval, caml_alloc(1,0)); + Field(th->retval,0) = (value)curr_thread; + + /* return Parent(child) to parent */ + value ret = caml_alloc(1,1); + Field(ret,0) = (value)th; + + return ret; +} + +/* For reflecting first class heaps back into the real heap. */ +value th_modify(value *r, value iv, value v) /* ml */ +{ + int i = Int_val(iv); + Modify(&(Field(r,i)), v); + return Val_unit; +} + +/* For returning values that are not in the first class heap. */ +value th_field(value *r, value iv) /* ml */ +{ + int i = Int_val(iv); + return (Field(r,i)); +} + +/* Tell the scheduler a given thread is a search thread. */ +value set_search_thread(value th) /* ml */ +{ + ((caml_thread_t)th)->thread_type = SEARCH_THREAD; + return Val_unit; +} + +/* We want to run search threads now. */ +value set_search_mode(value unit) /* ml */ +{ + doing_preemption_counts = 1; + preemptions_passed = 0; + curr_thread_type = SEARCH_THREAD; + return unit; +} + +/* Are we in a search? */ +value is_syncing(value ignore) /* ml */ +{ + return curr_thread_type; +} diff -uNr /tmp/ocaml-3.08.1/otherlibs/threads/thread.ml ocaml-3.08.1/otherlibs/threads/thread.ml --- /tmp/ocaml-3.08.1/otherlibs/threads/thread.ml 2003-03-20 08:23:04.000000000 -0800 +++ ocaml-3.08.1/otherlibs/threads/thread.ml 2008-04-09 14:11:28.547720495 -0700 @@ -42,6 +42,7 @@ external thread_initialize_preemption : unit -> unit = "thread_initialize_preemption" external thread_new : (unit -> unit) -> t = "thread_new" external thread_yield : unit -> unit = "thread_yield" +external thread_preempt : unit -> unit = "thread_preempt" external thread_request_reschedule : unit -> unit = "thread_request_reschedule" external thread_sleep : unit -> unit = "thread_sleep" external thread_wait_read : Unix.file_descr -> unit = "thread_wait_read" @@ -131,7 +132,7 @@ (* Preemption *) let preempt signal = - if !critical_section then () else thread_request_reschedule() + if !critical_section then () else thread_preempt() (* Initialization of the scheduler *) @@ -139,3 +140,21 @@ thread_initialize(); Sys.set_signal Sys.sigvtalrm (Sys.Signal_handle preempt); thread_initialize_preemption() + +(* The result of a thread fork. Tell a thread what it is and the TCB + * of the other thread. *) +type forkresult = Child of t | Parent of t + +(* Stack copying thread creation *) +external th_fork : unit -> forkresult = "th_fork" +let fork () = th_fork () + +(* Primitives for TE *) +external setSearchThread : t -> unit = "set_search_thread" +external setSearchMode : unit -> unit = "set_search_mode" +external isSyncing : unit -> bool = "is_syncing" +external logHeapOn : unit -> unit = "log_heap_on" +external logHeapOff : unit -> unit = "log_heap_off" +external modify : unit -> int -> unit -> unit = "th_modify" +external field : unit -> int -> unit = "th_field" + diff -uNr /tmp/ocaml-3.08.1/otherlibs/threads/thread.mli ocaml-3.08.1/otherlibs/threads/thread.mli --- /tmp/ocaml-3.08.1/otherlibs/threads/thread.mli 2004-06-30 02:32:40.000000000 -0700 +++ ocaml-3.08.1/otherlibs/threads/thread.mli 2008-04-09 14:11:28.551720931 -0700 @@ -139,3 +139,18 @@ (** Reactivate the given thread. After the call to [wakeup], the suspended thread will resume execution at some future time. *) +(* The resuld of fork; tells the callee what it is + * and the thread control block of the other thread. *) +type forkresult = Child of t | Parent of t + +(* Stack copying thread creation. *) +val fork : unit -> forkresult + +(* Primitives for TE *) +val logHeapOn : unit -> unit +val logHeapOff : unit -> unit +val isSyncing : unit -> bool +val modify : unit -> int -> unit -> unit +val field : unit -> int -> unit +val setSearchThread : t -> unit +val setSearchMode : unit -> unit diff -uNr /tmp/ocaml-3.08.1/otherlibs/threads/txevent.ml ocaml-3.08.1/otherlibs/threads/txevent.ml --- /tmp/ocaml-3.08.1/otherlibs/threads/txevent.ml 1969-12-31 16:00:00.000000000 -0800 +++ ocaml-3.08.1/otherlibs/threads/txevent.ml 2008-04-09 14:11:28.563722240 -0700 @@ -0,0 +1,922 @@ +(* File by Matthew Kehrt. + * + * Transactional events for ML. See txevent.mli *) + +let debug = false + +(* Bind some modules *) +module SyncId = Unique +module DepMap = Map.Make(SyncId) +module ThreadMap = Map.Make(struct type t = int let compare = (-) end) +module SyncIdMap = Map.Make(struct type t = SyncId.t let compare = SyncId.compare end) + +exception Impossible of string + +(* Paths are descriptions of the nondeterministic choices a search thread has + * made *) +type pathEl = + Left +| Right +| Send of trail +| Recv of trail +| Heap of heappath +and path = pathEl list + +(* A finished in a heap path indicates that the thread associated with it + * successfully completed. *) +and finished = traildepmap * (unit -> unit) +and finishedMap = finished DepMap.t + +(* Heappaths describe the search threads that have used a given heap. *) +and heappath = (SyncId.t * path * bool ref * finished option) list +and heap = (unit * int * unit) list * heappath + +(* The current state of a given search thread. *) +and state = (trail * traildepmap * heap) +and trail = (SyncId.t * path * bool ref) +and traildepmap = trail DepMap.t + +(* Events are functions from state to a new state and a value. *) +type 'a event = state -> (state * 'a) + +(* Channels list senders and receivers that are waiting on these channels. *) +type 'a senders = Ss of (Condition.t * ('a * state) * 'a receivers) list ref +and 'a receivers = Rs of (Condition.t * state * 'a senders) list ref +type 'a chan = 'a senders * 'a receivers * Mutex.t + +(* SOme printing functions *) +let rec pathElToString pe = + match pe with + Left -> "L" + | Right -> "R" + | Send (_,p,_) -> + "S(" ^ pathToString p ^ ")" + | Recv (_,p,_) -> + "R(" ^ pathToString p ^ ")" + | Heap hp -> "H" + +and pathToString p = "[" ^ String.concat ", " (List.map pathElToString p) ^ "]" +and trailToString ((t,p,_):trail) = "(" ^ SyncId.toString t ^ ":[" ^ pathToString p ^ "])" + +and heappathToString hp = + "[" ^ (String.concat "]~[" + (List.map + (fun (t,p,_,_) -> (SyncId.toString t) ^ ":" ^ pathToString p) + hp)) + ^ "]" + +let heapToString (_,hp) = "(_,["^heappathToString hp^"])" + +let stateToString ((t,d,h):state) = + "(" ^ trailToString t ^ ", " ^ "_" ^ ", " ^ heapToString h ^ ")" + +(* List of all the current heaps in the pool. *) +let (heaps : (heap * trail DepMap.t) list ref) = ref [] +(* List of all the search threads waiting for a new heap. *) +let heapWaiters = ref [] +let heapWaitersLock = Mutex.create () + +(* When we are in user code, use this to tell which heap to write to. *) +let statemap = ref ThreadMap.empty + +(* for debugging *) +let gettid () = Thread.id(Thread.self ()) +let gettids () = string_of_int (gettid ()) + +(* Debugging print. *) +let ps s = + if debug + then begin + print_string (gettids () ^ " : " ^ s ^ "\n"); + flush stdout + end + else () + +(* Get the state associated with the current thread when in user code. *) +let getstate () = + try ThreadMap.find (gettid ()) (!statemap) + with Not_found -> + begin + raise Not_found + end + +(* Information on how to clean up a failed search. *) +let cleanupLock = Mutex.create () +let cleanup = ref SyncIdMap.empty + +let heapLock = Mutex.create () +(* Create a new heap. *) +let newHeap () = + (([],[]),DepMap.empty) + +(* Die if the sync that caused this search thread has alrady succeeded or if we + * are killed when the search fails or times out. *) +let checkThread br mo = + if not (!br) + then + let _ = ps "Failed thread check. Dying." in + begin + (match mo with + Some m -> Mutex.unlock m + | None -> ()); + Thread.exit () + end + else () + +(* Add a thread to be killed synchronously if the search fails. Done before + * entering user code or waiting on a condition to make sure we are killed even + * if we never come back *) +let tcbCleanup t br = + Mutex.lock cleanupLock; + Thread.critical_section := true; + try + ps ("Looking up cleanup for sid " ^ (SyncId.toString t)); + let (br,tcbs,tids) = SyncIdMap.find t (!cleanup) in + let tcb = Thread.self () in + cleanup := SyncIdMap.add t (br,tcb::tcbs,tids) (!cleanup); + Mutex.unlock cleanupLock; + Thread.critical_section := false + with Not_found -> + begin + Mutex.unlock cleanupLock; + Thread.critical_section := false; + if not (!br) then Thread.exit () else (); + ps "Could not find cleanup info in tcbCleanup."; + raise Not_found + end + +(* Remove such cleanup information. *) +let tcbRemoveCleanup t br = + Mutex.lock cleanupLock; + Thread.critical_section := true; + try + let (br,tcbs,tids) = SyncIdMap.find t (!cleanup) in + let tcb = Thread.self () in + let tcbs' = List.filter (fun tcb' -> not (tcb == tcb')) tcbs in + cleanup := SyncIdMap.add t (br,tcbs',tids) (!cleanup); + Mutex.unlock cleanupLock; + Thread.critical_section := false + with Not_found -> + begin + Mutex.unlock cleanupLock; + Thread.critical_section := false; + if not (!br) then Thread.exit () else (); + ps "Could not find cleanup info in tcbRemoveCleanup."; + raise Not_found + end + +(* Register cleanup information for the state map, which maos from tids to + * states of programs in user code. If we are syncronously killed in user code, + * this will allow our element in the statemap to be removed. *) +let tidCleanup t br = + Mutex.lock cleanupLock; + try + let (br,tcbs,tids) = SyncIdMap.find t (!cleanup) in + let tid = Thread.id (Thread.self ()) in + cleanup := SyncIdMap.add t (br,tcbs,tid::tids) (!cleanup); + Mutex.unlock cleanupLock + with Not_found -> + begin + Mutex.unlock cleanupLock; + if not (!br) then Thread.exit () else (); + ps "Could not find cleanup info in tidCleanup."; + raise Not_found + end + +(* Unregister tid information. *) +let tidRemoveCleanup t br = + Mutex.lock cleanupLock; + try + let (br,tcbs,tids) = SyncIdMap.find t (!cleanup) in + let tid = Thread.id (Thread.self ()) in + let tids' = List.filter (fun tid' -> not (tid = tid')) tids in + cleanup := SyncIdMap.add t (br,tcbs,tids') (!cleanup); + Mutex.unlock cleanupLock + with Not_found -> + begin + Mutex.unlock cleanupLock; + if not (!br) then Thread.exit () else (); + ps "Could not find cleanup info in tidRemoveCleanup."; + raise Not_found + end + +let statemapLock = Mutex.create () + +(* Register a state before going into user code. *) +let registerState (((t,_,_),_,_) as s) = + let tid = gettid () in + let _ = Mutex.lock statemapLock in + let _ = statemap := ThreadMap.add (tid) (ref s) (!statemap) in + Mutex.unlock statemapLock + +(* Unregister the state. *) +let unregisterState () = + let tid = gettid () in + let _ = Mutex.lock statemapLock in + let _ = statemap := ThreadMap.remove (tid) (!statemap) in + Mutex.unlock statemapLock + +(* Some equality functions. *) +let rec pathEq p p' = + try + List.for_all2 pathElEq p p' + with _ -> false + +and pathElEq e1 e2 = + match (e1,e2) with + (Left,Left) -> true + | (Right,Right) -> true + | (Send (t,p,_),Send (t',p',_)) -> + SyncId.eq t t' && pathEq p p' + | (Recv (t,p,_),Recv (t',p',_)) -> + SyncId.eq t t' && pathEq p p' + | (Heap hp, Heap hp') -> heappathEq hp hp' + | _ -> false + +(* Is path l an extension of path s? *) +and isExtension' l s = + match (l,s) with + (_,[]) -> true + | ([],hd::tl) -> false + | (pel::tll,pes::tls) -> + if pathElEq pel pes + then isExtension' tll tls + else false +and isExtension l s = isExtension' (List.rev l) (List.rev s) + +and heappathElEq (t,p,_,_) (t',p',_,_) = + SyncId.eq t t' && pathEq p p' + +and heappathEq hp hp' = + try + List.for_all2 heappathElEq hp hp' + with _ -> false + +let heapEq (_,hp) (_,hp') = heappathEq hp hp' + +(* Does one heappath exctend another? *) +let rec isHeappathExtension' l s = + begin + match (l,s) with + (_,[]) -> true + | ([],hd::tl) -> false + | (hpel::tll,hpes::tls) -> + if heappathElEq hpel hpes + then isHeappathExtension' tll tls + else false + end +let isHeappathExtension l s = + isHeappathExtension' (List.rev l) (List.rev s) + +(* Same, but wrapped in a heap. *) +let isHeapExtension (_,l) (_,s) = isHeappathExtension l s + +(* Add to a map. *) +let adddep d t p = (DepMap.add t p d) + +(* Create a new, empty channel. *) +let newChan () = (Ss (ref []), Rs (ref []), Mutex.create ()) + +(* aLwaysEvt just succeeds immediately. *) +let alwaysEvt a (((_,_,br),_,_) as s) = + ps "In always"; + let _ = checkThread br None in + (s,a) + +(* neverEvt never succeeds. *) +let rec neverEvt s = + begin + ps "In never"; + (* Thread.exit : unit -> unit ??? *) + Thread.exit (); + raise (Impossible "failed to exit thread?") + end + +(* Choose evt forks of threads to explore each nondeterministic branch. *) +let chooseEvt e1 e2 (((t,p,br),d, h) as s) = + let _ = ps "In choose." in + let _ = checkThread br None in + match Thread.fork () with + Thread.Parent _ -> e1 ((t,(Left::p),br),adddep d t (t,(Left::p),br), h) + | Thread.Child _ -> + begin + checkThread br None; + e2 ((t,(Right::p),br),adddep d t (t,(Right::p),br), h) + end + +(* thenEvt sequence events. As part of this, it calls arbitrary user code. + * When going into such code, we turn on heap logging and register our state so + * that user code knows what heap to write to. *) +let thenEvt e f (((t,_,br),_,_) as s) = + let logOff v = (Thread.logHeapOff (); v) in + let ((((t',_,br'),_,_) as s'), v') = e s in + let _ = checkThread br None in + tcbCleanup t' br'; + tidCleanup t' br'; + registerState s'; + ps "In then."; + Thread.logHeapOn (); + let e'' = logOff (f v') in + ps "Out of then"; + let s'' = + try !(ThreadMap.find (gettid()) (!statemap)) + with Not_found -> + begin + ps "Could not find statemap in thenEvt."; + raise Not_found + end + in + unregisterState (); + tidRemoveCleanup t br; + tcbRemoveCleanup t br; + let ((((_,_,br'''),_,_) as s'''),_) as res = e'' s'' in + let _ = checkThread br''' None in + res + +(* Given two dependency sets, check to see if they are coherent and return their + * union. *) +let cohereUnion (d,h) (d',h') = + let f otherdep k (t,p,b) acc = + match acc with + Some accdep -> + begin + try + let (t',p',b') = DepMap.find k otherdep in + if SyncId.eq t t' + then + begin + if isExtension p p' then Some (DepMap.add k (t,p,b) accdep) + else if isExtension p' p then Some (DepMap.add k (t',p',b) accdep) + else None + end + else None + with Not_found -> Some (DepMap.add k (t,p,b) accdep) + end + | None -> None + in + let hopt = if isHeapExtension h h' then Some (h) + else if isHeapExtension h' h then Some (h') + else None in + let depopt = DepMap.fold (f d') d (Some DepMap.empty) in + let depopt' = DepMap.fold (f d) d' depopt in + match (depopt',hopt) with + (Some d, Some h) -> Some (d,h) + | _ -> None + +(* Check to make sure a trail satisfies a dependency. *) +let checkExtension k (t,p,b) d = + try + let (t',p', b') = (DepMap.find k d) in + isExtension p p' && SyncId.eq t t' + with Not_found -> true + +(* My race needs monads to survive. *) +let foldo f i l = + List.fold_left + (fun acc curr -> + match acc with + Some acc -> f acc curr + | None -> None) + i l + +(* Check to make sure two depencency maps are satisfied, and, if so, return + * their union. + * This will possibly fail if mulitple requirements in a set are + * for the same SyncId. As I never actually do this, I don't so much care *) +let cohere ttrl1 hl1 (* requirements imposed on deps 2 *) + dh1 (* deps 1 *) + ttrl2 hl2 (* requirements imposed on deps 1 *) + dh2 (* deps 2 *) + = + let f dh ttrl hl = + foldo + (fun (d,h) h' -> + if isHeapExtension h' h + then Some (d, h') + else None + ) + (foldo + (fun (d,h) (t, tr) -> + if checkExtension t tr d + then Some (adddep d t tr,h) + else None) + (Some dh) ttrl) + hl in + match (f dh1 ttrl2 hl2, f dh2 ttrl1 hl1) with + (Some dh1', Some dh2') -> cohereUnion dh1' dh2' + | _ -> None + +(* Lazy iteration. Used for creating multiple threads for each element of a + * list. *) +let rec iter (l:'a list) (f:('a -> (unit -> 'b) -> 'b)) (b:unit->'b) = + match l with + [] -> b () + | hd::tl -> f hd (fun () -> iter tl f b) + +(* condwait wants a mutex but we don't want to + * be killed while holding a useful one. *) +let dumbLock = Mutex.create () +(* heapLock should be locked *) +(* may actually return *) +(* Safely wait on a condition, *) +let rec wait c m v f t br () = + let _ = Mutex.unlock m in + let _ = tcbCleanup t br in + let _ = ps "Waiting." in + let _ = Condition.wait c dumbLock in + let _ = ps "Was signalled." in + let _ = Mutex.unlock dumbLock in + let _ = Mutex.lock m in + let _ = tcbRemoveCleanup t br in + let _ = checkThread br (Some m) + in + let wait' = wait c m v f t br in + f v wait' + +(* Wait on a condition, then loop over a list ref and go back to sleep. *) +let rec waitThenLoop c m v loop base t br () = + wait c m v + (fun l it -> iter (!l) (loop) (fun () -> base it)) + t br () + +(* call with thlm locked *) +(* Loop over a list, doing something each time, and then sleep, waiting for new + * elements to be added to the list. *) +let waitForThing thl thlm wl wlm f stuff rlf t br = + let c = Condition.create () in + let rl = ref [] in + ps ("about to iter in wft"); + iter + (!thl) f + (fun () -> + begin + ps ("itering..."); + if not (wlm == thlm) then Mutex.lock wlm; + wl := (c,stuff,rlf rl)::(!wl); + if not (wlm == thlm) then Mutex.unlock wlm; + waitThenLoop c thlm rl f + (fun k -> ( + if not (wlm == thlm) then Mutex.lock wlm; + rl := []; + if not (wlm == thlm) then Mutex.unlock wlm; + k ())) + t br () + end ) + +(* Kill everything in a given cleanup tuple. Called by below functions. *) +let kill (br,waitingTcbs,tids) = + begin + br := false; + List.iter (fun tcb -> if tcb == Thread.self () + then ps "Not killing ourself." + else begin + ps ("Synchronously killing thread " ^ string_of_int (Thread.id tcb)); + try Thread.kill tcb + with Failure "Thread.kill: killed thread" -> () + end) + waitingTcbs; + List.iter + (fun tid -> try statemap := ThreadMap.remove tid (!statemap) + with Not_found -> + (ps "not found in statemap in kill?")) + tids; + end + +(* The Deplorable Word... + * Kills every thread associated with a search, asynchronously if possible, but + * synchronously if necessary. + called atomicly from C *) +let killAll () = + begin + ps "Killing all search threads."; + SyncIdMap.iter + (fun t -> kill) + (!cleanup); + cleanup := SyncIdMap.empty; + heapWaiters := []; + ps "done killing everyone." + end +let _ = Callback.register "ml_kill_all" killAll + +(* Kill every search thread associated with a given program thread. *) +let killById t = + Mutex.lock cleanupLock; + kill (SyncIdMap.find t (!cleanup)); + ps ("Removing cleanup for sid " ^ (SyncId.toString t)); + cleanup := SyncIdMap.remove t (!cleanup); + Mutex.unlock cleanupLock; + Mutex.lock heapWaitersLock; + heapWaiters := List.filter (fun (_,t',_) -> not (SyncId.eq t t')) + (!heapWaiters); + Mutex.unlock heapWaitersLock + +(* Inform the threads waiting for a heap thst there is a new one available. *) +let signalHeapWaiters h = + let f (c,_,l) = + begin + l := h::(!l); + ps ("is signalling heap waiters."); + Condition.broadcast c + end + in + Mutex.lock heapWaitersLock; + List.iter f (!heapWaiters); + Mutex.unlock heapWaitersLock + +(* call while locked *) +(* Clean out waiters on a channel that have already died. *) +let cleanChan sendersr recversr = + let spred (_,(_,((_,_,br),_,_)),_) = !br in + let rpred (_,((_,_,br),_,_),_) = !br in + begin + sendersr := List.filter spred (!sendersr); + recversr := List.filter rpred (!recversr); + end + +(* call while locked *) +(* Add a heap to the set of heaps if it is not already there. This includes + * calculating the new state for the search thread checking the heap in. *) +let checkinHeap ((t,p,br),d,(hl,hp)) fo = + let _ = ps "Attempting to check in heap." in + let peh = Heap hp in + let p' = peh::p in + let tr' = (t,p',br) in + let d' = adddep d t tr' in + let dfo = (match fo with + Some f -> Some (d,f) + | None -> None) in + let hpe = (t,p',br,dfo) in + let hp' = hpe::hp in + let _ = try ignore (List.find (fun (oh,_) -> heapEq (hl,hp') oh) (!heaps)) + with Not_found -> + begin + Mutex.lock heapLock; + heaps := (((hl,hp'),d')::(!heaps)); + ps ("Adding new heap" ^heapToString (hl,hp')^"; there are " ^ string_of_int (List.length (!heaps)) ^ " heaps."); + signalHeapWaiters ((hl,hp'),d'); + Mutex.unlock heapLock + end in + (tr', d', (hl,hp')) + +(* Preficates to see if senders and receivers are compatible, and, if so, spawn + * a new search thread. *) +let checkSender (((t,_,_) as tr),d,h) + (_,(v',(((t',_,_) as tr'),d',h')),_) it = + match cohere [(t,tr)] [] (d,h) [(t',tr')] [] (d',h') with + Some (deps, h) -> + begin + match Thread.fork () with + Thread.Child _ -> (v',tr',deps,h) + | Thread.Parent _ -> it () + end + | None -> it () + +let checkRecver (((t,_,_) as tr),d,h) + (_,((((t',_,_) as tr'),d',h')),_) it = + match cohere [(t,tr)] [] (d,h) [(t',tr')] [] (d',h') with + Some (deps,h) -> + begin + match Thread.fork () with + Thread.Child _ -> (tr',deps,h) + | Thread.Parent _ -> it () + end + | None -> it () + +(* See if a heap is coherent with a search thread, and, if so, spawn a new + * thread to search that combination. *) +let checkHeap d t tr ((((hl,hp) as h),hd)) it = + ps ("In checkHeap; about to check coherence."); + match cohere [(t,tr)] [] d [] [h] (hd,h) with + Some (hd',h') -> + begin + ps ("In checkHeap; coherent."); + match Thread.fork () with + Thread.Parent _ -> it () + | Thread.Child _ -> (h,hd') + end + | None -> (ps ("In checkHeap; NOT coherent.") ; it ()) + (* else it () *) + +(* Signal any senders that a new receiver has received on this channel, and they + * should check if they are coherent. *) +let signalSenders senders m v = + let f (c,_,Rs l) = + begin + l := (Condition.create (), v, Ss(ref []))::(!l); + Condition.broadcast c + end + in + List.iter f (!senders) + +(* Dual. *) +let signalRecvers recvers m v = + let f (c,_,Ss l) = + begin + l := (Condition.create (), v, Rs(ref []))::(!l); + Condition.broadcast c + end + in + List.iter f (!recvers) + +(* sendEvt and recvEvt each check in a heap. Then they check to see if any + * existing threads want to communicate, spawning new search threads for every + * possible communication partner with which they are coherent. After they are + * out of communication partners, they sleep until a new possible partner + * attempts to communicate, which wakes them up. + * + * Then they do the same for any heaps with which they are coherent. The result + * is that any heap * communication partner pair with which wich they are + * coherent at any time in the search generates a new search thread. *) +let sendEvt (Ss sendersr, Rs recversr, m) v ((((t,p,br),d,((hl,hp) as h)) as s):state) = + let _ = ps ("In send.") in + let _ = checkThread br None in + let (((_,p',_),_,_) as s') = checkinHeap s None in + let _ = Mutex.lock m in + let _ = cleanChan sendersr recversr in + let _ = signalRecvers recversr m (v,s') in + (* The following expression returns multiple times: once for each possible + * communication partner. *) + let (trf,d'',h'') = + waitForThing recversr m sendersr m (checkRecver s') (v,s') (fun x -> Rs x) t br in + let pe = Send trf in + let p'' = pe::p' in + let tr'' = (t,p'',br) in + let _ = Mutex.lock heapLock in + (* The following expression returns multiple times: once for each possible + * heap. *) + let (h''',d''') = + waitForThing heaps heapLock heapWaiters heapWaitersLock + (checkHeap (d'',h'') t tr'') t (fun x -> x) t br + in + ((tr'',d''',h'''),()) + +(* Very similar to sendEvt *) +let recvEvt ( Ss sendersr, Rs recversr, m) ((((t,_,br),d,h) as s):state) = + let _ = ps ("In recv.") in + let _ = checkThread br None in + let ((_,p',_),_,_) as s' = checkinHeap s None in + let _ = Mutex.lock m in + let _ = cleanChan sendersr recversr in + let _ = signalSenders sendersr m s' in + (* The following expression returns multiple times: once for each possible + * communication partner. *) + let (vf,trf,d'',h'') = + waitForThing sendersr m recversr m (checkSender s') s' (fun x -> Ss x) t br in + let pe = Recv trf in + let p'' = pe::p' in + let tr'' = (t,p'',br) in + let _ = Mutex.lock heapLock in + let (h''',d''') = + (* The following expression returns multiple times: once for each possible + * heap. *) + waitForThing heaps heapLock heapWaiters heapWaitersLock + (checkHeap (d'',h'') t tr'') t (fun x -> x) t br + in + ((tr'',d''',h'''),vf) + + +(* Make sure every element of a map satisfies a predicate. *) +let mapForAll f m = + DepMap.fold + (fun k v a -> + a && f k v) + m true + +(* Make sure every path in list satisfies rvery dependency map. *) +let satisfiesDeps fm pm = + mapForAll + (fun _ (d,_) -> + mapForAll + (fun t (_,p,_) -> + isExtension (DepMap.find t pm) p) + d) + fm + +(* The two below functions make sure that a given heappath corresponds to a + * committable set of threads. We use the fact that a heappaath has witnessed a + * coherent set of threads from their beginnings to find this set. All we need + * to do is make sure all of the threads have all finished and satisfy each + * other's dependencies. *) +let rec findset' hp fm pm = + match hp with + [] -> Some (fm, pm) + | (t,p,b,Some (d,f))::tl -> + if DepMap.mem t fm + then None + else findset' tl (adddep fm t (d,f)) (adddep pm t p) + | (t,p,b,None)::tl -> + if DepMap.mem t fm + then if isExtension (DepMap.find t pm) p + then findset' tl fm pm + else None + else None + +let findset hp = + match findset' hp DepMap.empty DepMap.empty with + None -> None + | Some (fm,pm) -> + let f _ p = + match p with + (Heap hp')::_ -> isHeappathExtension hp hp' + | _ -> false in + if mapForAll f pm && satisfiesDeps fm pm + then Some fm + else None + + +let findSetLock = Mutex.create () + +(* Reflect a first class heap back to the real heap. *) +let reflect hl = + List.iter (fun (r,i,v) -> Thread.modify r i v) (List.rev hl) + +(* Wait for a starting heap for a given thread. Returns multiple times. *) +let waitForHeap dh t br = waitForThing heaps heapLock + heapWaiters heapWaitersLock + (checkHeap dh t (t,[],br)) t (fun x -> x) t br + +(* Returns all possible starting heaps for a given thread. *) +let initstate t br = + let d = DepMap.empty in + let _ = Mutex.lock heapLock in + let _ = ps ("Trying to get an initial heap, there are "^string_of_int + (List.length (!heaps))^" heaps.") in + let (h,d') = waitForHeap (d,([],[])) t br in + ps ("I got an initial heap."); + ((t, [], br), d', h) + +(* Spawn a given initial search thread. *) +let doSyncLoop (_,f) it = + let tcb = (Thread.create f ()) in + Thread.setSearchThread tcb; + it () + +(* Currently running syncs. *) +let syncers = ref [] +let syncersLock = Mutex.create () + +(* Actually perform a search for a given program thread, and check the results + * to see if we get a commibable set. If so, cancel the search, reflect the + * first class heap out and give the resulting values to the waiting program + * threads. *) +let doSync e res c t () = + let br = ref true in + let _ = Mutex.lock cleanupLock in + ps ("Adding cleanup for sid " ^ (SyncId.toString t)); + let _ = cleanup := SyncIdMap.add t (br,[],[]) (!cleanup) in + let _ = Mutex.unlock cleanupLock in + (* initstate returns multiple times, once for every possible initial + * heap. *) + let is = initstate t br in + let (s,v) = e (is:state) in + let _ = if not (!br) then Thread.exit () else () in + + let f () = + begin + res := Some v; + ps ("Signalling the victor."); + Condition.signal c; + killById t; + Mutex.lock syncersLock; + syncers := List.filter (fun (t',_) -> not (SyncId.eq t t')) (!syncers); + Mutex.unlock syncersLock + end + in + (* Check in final heap for a given search thread. *) + let ((_,p',br),_,((hl',hp') as h')) = checkinHeap s (Some f) in + let _ = ps ("Completed executing event with path " ^ + pathToString p' ^ "\n\t and heappath" ^ + heappathToString hp') in + Mutex.lock findSetLock; + checkThread br (Some findSetLock); + ps ("About to find set. Path is " ^ pathToString p' ^ "."); + ps ("Heappath is " ^ heappathToString (snd h') ^ "."); + begin + match findset hp' with + Some fm -> + begin + ps "Victory."; + DepMap.iter + (fun _ (_,f) -> f ()) + fm; + ps "reflecting."; + ps ("Heap now has size " ^ string_of_int (List.length hl') ^ "."); + Thread.critical_section := true; + reflect hl'; + Thread.critical_section := false; + ps "done reflecting."; + end + | None -> (); + end; + Mutex.unlock findSetLock; + Thread.exit () + +(* If the search times out and restarts later, this is the thread that restarts + * it. *) +let currRestartThread = ref None + +(* Set the restart thread. *) +let setRestartThread tcb = + currRestartThread := Some tcb + +(* Remove the restart thread on a finished search or too many timeouts. *) +let killRestartThread () = + match (!currRestartThread) with + None -> () + | Some tcb -> Thread.kill tcb +let _ = Callback.register "ml_kill_restart_thread" killRestartThread + +(* Wake up restart thread. *) +let wakeRestartThread () = + match (!currRestartThread) with + None -> raise (Impossible "Trying to wake a nonexistent restart thread.") + | Some tcb -> Thread.wakeup tcb +let _ = Callback.register "ml_wake_restart_thread" wakeRestartThread + +(* The restart thread just loops, rerunning all the initial search threads every + * time it is woken up. *) +let rec restartThread ss = + heaps := [newHeap ()]; + iter (ss) doSyncLoop (fun _ -> ()); + Thread.sleep (); + restartThread ss + +(* only one thread may be syncing at once *) +let syncLock = Mutex.create () + +(* Do the sync. *) +let sync (e : 'a event) = + if Thread.isSyncing () + then + (* A nested sync. *) + let _ = Thread.logHeapOff () in + ps ("In NS."); + let s = !(ThreadMap.find (gettid ()) (!statemap)) in + ps ("About to run nested event."); + let (s',v') = e s in + ps ("Done running nested event."); + registerState s'; + Thread.logHeapOn (); + v' + else + (* only one thread can be running the sync at once. No other program + * threads are scheduled after doSyncBase until the search succeeds or + * fails. + *) + let _ = Mutex.lock syncLock in + let _ = ps "in sync." in + let res = ref None in + let t = SyncId.create () in + let c = Condition.create () in + let _ = ps ("about to lock hl in sync.") in + let _ = Mutex.lock heapLock in + let _ = ps ("Locked hl in sync.") in + (* let _ = heaps := [newHeap ()] in *) + let _ = Mutex.unlock heapLock in + let _ = Mutex.lock syncersLock in + let _ = syncers := (t, doSync e res c t)::(!syncers) in + let ss = (!syncers) in + let _ = Thread.critical_section := true in + let _ = Mutex.unlock syncLock in + let _ = Mutex.unlock syncersLock in + let tcb = Thread.create restartThread ss in + let _ = Thread.setSearchThread tcb in + let _ = setRestartThread tcb in + let _ = Thread.setSearchMode () in + let _ = Condition.wait c dumbLock in + match (!res) with + Some v -> + begin + Mutex.unlock dumbLock; + v + end + (* an examination of the implementation of condtion signals seems to + * indicate that spurious wakeups are impossible, so this should never + * happen. *) + | None -> raise (Impossible "syncing thread woken up with no value.") + +(* Read from a first class heap. *) +(* called atomically, from C *) +let heapReadField r i = + let _ = ps "In ML read field." in + try + begin + let (_,_,(hl,_)) = !(getstate ()) in + let _ = ps ("Done; size of hl is " ^ string_of_int (List.length hl)) in + match (List.find (fun (r',i',_) -> r == r' && i = i') hl) with + (_,_,v) -> v + end + with Not_found -> (ps "returning normal val in read_field"; Thread.field r i) + +let _ = Callback.register "ml_heap_read_field" heapReadField + +(* "Write" to a first class heap. *) +(* called atomically, from C *) +let rec heapWriteField r i v = + let _ = ps ("In ML write field with " ^ string_of_int (Obj.magic v)) in + let sr = getstate () in + let (t,d,(hl,hp)) = !sr in + let _ = ps "got heap" in + sr := (t,d,((r,i,v)::hl,hp)) + +let _ = Callback.register "ml_heap_write_field" heapWriteField diff -uNr /tmp/ocaml-3.08.1/otherlibs/threads/txevent.mli ocaml-3.08.1/otherlibs/threads/txevent.mli --- /tmp/ocaml-3.08.1/otherlibs/threads/txevent.mli 1969-12-31 16:00:00.000000000 -0800 +++ ocaml-3.08.1/otherlibs/threads/txevent.mli 2008-04-08 15:17:32.500444068 -0700 @@ -0,0 +1,32 @@ +(* File by Matthew Kehrt. + * + * Interface for transactional events for Caml. Donnelly and + * Fluet introduced transactional events for Haskell. Such events + * are in the style of CML, but include a thenEvt combinator for + * sequencing events into a single event, synchronizing on which + * will cause the subevents to either all succeed or all fail + * together. This library extends these to OCaml. It allows + * impure code inside thenEvts and guarantees that the effects + * of this code will not be seen unless the event succeeds. + * + * See http://wasp.cs.washington.edu/tecaml for more details. *) + +(* A synchronous channel *) +type 'a chan + +(* A description of a communication to be performed. *) +type 'a event + +(* Create a channel. *) +val newChan : unit -> 'a chan + +(* Event combinators. *) +val alwaysEvt : 'a -> 'a event +val neverEvt : 'a event +val thenEvt : 'a event -> ('a -> ('b event)) -> 'b event +val chooseEvt : 'a event -> 'a event -> 'a event +val sendEvt : 'a chan -> 'a -> unit event +val recvEvt : 'a chan -> 'a event + +(* Sync performs the events. *) +val sync : 'a event -> 'a diff -uNr /tmp/ocaml-3.08.1/otherlibs/threads/unique.ml ocaml-3.08.1/otherlibs/threads/unique.ml --- /tmp/ocaml-3.08.1/otherlibs/threads/unique.ml 1969-12-31 16:00:00.000000000 -0800 +++ ocaml-3.08.1/otherlibs/threads/unique.ml 2008-04-08 15:17:22.151315062 -0700 @@ -0,0 +1,25 @@ +(* File by Matthew Kehrt. + * Generate unique ids for TE *) + +type t = int + +let compare x y = x - y + +let curr = ref 0 + +let m = Mutex.create () + +let create () = + begin + Mutex.lock m; + curr := (!curr) + 1; + let u = !curr in + Mutex.unlock m; + u + end + +let eq u1 u2 = + u1 = u2 + +let toString u = + string_of_int u diff -uNr /tmp/ocaml-3.08.1/otherlibs/threads/unique.mli ocaml-3.08.1/otherlibs/threads/unique.mli --- /tmp/ocaml-3.08.1/otherlibs/threads/unique.mli 1969-12-31 16:00:00.000000000 -0800 +++ ocaml-3.08.1/otherlibs/threads/unique.mli 2008-04-08 15:17:25.815714817 -0700 @@ -0,0 +1,8 @@ +(* File by Matthew Kehrt. + * Generate unique ids for TE. *) + +type t +val compare: t -> t -> int +val create : unit -> t +val eq : t -> t -> bool +val toString : t -> string