From f3faca3ee7d61b63535328f84ae7907c70c083d0 Mon Sep 17 00:00:00 2001 From: Salikh Zakirov Date: Tue, 20 Mar 2007 22:20:25 +0300 Subject: [PATCH] hythread_join(): reflect contract change in thread c unit tests hythread_join() accepts a thread block as a parameter, which is inherently unsafe if thread block has an explicit lifecycle, as the thread block can be freed simultaneously with other thread calling to hythread_join(). Thus, it is necessary to lock the global thread lock to make sure thread is not terminated while we are working with a thread block. --- vm/tests/unit/thread/test_native_basic.c | 6 +- vm/tests/unit/thread/test_native_fat_monitor.c | 2 +- vm/tests/unit/thread/test_native_suspend.c | 4 +- vm/tests/unit/thread/test_native_thin_monitor.c | 4 +- vm/tests/unit/thread/test_performance_basic.c | 6 +- .../thread/test_performance_concurrent_mutex.c | 2 +- .../unit/thread/utils/thread_unit_test_utils.c | 22 ++++- .../unit/thread/utils/thread_unit_test_utils.h | 1 + vm/thread/src/thread_native_basic.c | 118 +++++++++++++++++--- vm/thread/src/thread_native_latch.c | 8 +- vm/thread/src/thread_private.h | 3 + vm/vmcore/src/init/vm_shutdown.cpp | 35 ++++-- 12 files changed, 168 insertions(+), 43 deletions(-) diff --git a/vm/tests/unit/thread/test_native_basic.c b/vm/tests/unit/thread/test_native_basic.c index 27be05b..b2c3e49 100644 --- a/vm/tests/unit/thread/test_native_basic.c +++ b/vm/tests/unit/thread/test_native_basic.c @@ -42,6 +42,7 @@ int test_hythread_create(void){ void **args; hythread_t thread = NULL; int r; + hythread_iterator_t iterator; apr_pool_create(&pool, NULL); @@ -56,7 +57,8 @@ int test_hythread_create(void){ r = hythread_create_with_group(&thread, args[0], 1024000, 1, 0, start_proc, args); tf_assert(r == 0 && "thread creation failed"); - r = hythread_join(thread); + r = join_thread(thread, args[0]); + tf_assert(r == 0 && "thread join failed"); return TEST_PASSED; } @@ -174,7 +176,7 @@ int test_hythread_create_many(void){ sprintf(buf, "Thread %d\0", i); hythread_set_name(thread, buf); */ - r = hythread_join(thread); + r = join_thread(thread, group); tf_assert(r == 0 && "thread join failed"); } diff --git a/vm/tests/unit/thread/test_native_fat_monitor.c b/vm/tests/unit/thread/test_native_fat_monitor.c index 0fa29fa..ddb9055 100644 --- a/vm/tests/unit/thread/test_native_fat_monitor.c +++ b/vm/tests/unit/thread/test_native_fat_monitor.c @@ -94,7 +94,7 @@ int test_wait_signal(void){ } for (i = 0; i < NMB; i++){ jthread_sleep(100, 0); - hythread_join(threads[i]); + join_thread(threads[i], NULL); } return 0; } diff --git a/vm/tests/unit/thread/test_native_suspend.c b/vm/tests/unit/thread/test_native_suspend.c index 2c9c6b4..91c1152 100644 --- a/vm/tests/unit/thread/test_native_suspend.c +++ b/vm/tests/unit/thread/test_native_suspend.c @@ -62,7 +62,7 @@ int test_hythread_thread_suspend(void){ hythread_resume(thread); - hythread_join(thread); + join_thread(thread, NULL); tf_assert_same((int)args[1], 1); @@ -109,7 +109,7 @@ int test_hythread_thread_suspend_all(void){ hythread_resume_all(hythread_self()->group); - hythread_join(thread); + join_thread(thread, NULL); tf_assert_same((IDATA)args[1], 1); diff --git a/vm/tests/unit/thread/test_native_thin_monitor.c b/vm/tests/unit/thread/test_native_thin_monitor.c index 3817563..eafd2a1 100644 --- a/vm/tests/unit/thread/test_native_thin_monitor.c +++ b/vm/tests/unit/thread/test_native_thin_monitor.c @@ -148,7 +148,7 @@ int test_hythread_thin_monitor_enter_contended(void){ status = hythread_thin_monitor_exit(&lockword_ptr); tf_assert_same(status, TM_ERROR_NONE); hythread_suspend_enable(); - hythread_join(thread); + join_thread(thread, NULL); tf_assert_same((IDATA)args[1], 1); hythread_suspend_disable(); @@ -167,7 +167,7 @@ int test_hythread_thin_monitor_enter_contended(void){ hythread_suspend_enable(); tf_assert_same(status, TM_ERROR_NONE); - hythread_join(thread); + join_thread(thread, NULL); tf_assert_same((int)args[1], 1); diff --git a/vm/tests/unit/thread/test_performance_basic.c b/vm/tests/unit/thread/test_performance_basic.c index aa9bc44..47c6cef 100644 --- a/vm/tests/unit/thread/test_performance_basic.c +++ b/vm/tests/unit/thread/test_performance_basic.c @@ -325,7 +325,7 @@ int test_hythread_create(void) { thread = NULL; stat = hythread_create(&thread, 0, 0, 0, proc_empty, args); assert(!stat); - stat = hythread_join(thread); + stat = join_thread(thread, NULL); assert(!stat); } end = apr_time_now(); @@ -463,7 +463,7 @@ int test_hythread_set_private_data(void) { } stat = hycond_notify(tm_condition_lock); assert(!stat); - stat = hythread_join(thread); + stat = join_thread(thread, NULL); assert(!stat); stat = hymutex_destroy(&tm_mutex_lock); assert(!stat); @@ -559,7 +559,7 @@ int test_hythread_get_private_data(void) { } stat = hycond_notify(tm_condition_lock); assert(!stat); - stat = hythread_join(thread); + stat = join_thread(thread, NULL); assert(!stat); stat = hymutex_destroy(&tm_mutex_lock); assert(!stat); diff --git a/vm/tests/unit/thread/test_performance_concurrent_mutex.c b/vm/tests/unit/thread/test_performance_concurrent_mutex.c index 9acac5f..ab136d1 100644 --- a/vm/tests/unit/thread/test_performance_concurrent_mutex.c +++ b/vm/tests/unit/thread/test_performance_concurrent_mutex.c @@ -305,7 +305,7 @@ int test_hythread_cuncurrent_mutex_tm(hythread_t threads_array[], start = apr_time_now(); hycond_notify_all(tm_condition_lock); for (i = 0; i < THREADS_NUMBER; i++) { - hythread_join(threads_array[i]); + join_thread(threads_array[i], NULL); } end = apr_time_now(); stat = hymutex_destroy(&tm_concurrent_mutex_lock); diff --git a/vm/tests/unit/thread/utils/thread_unit_test_utils.c b/vm/tests/unit/thread/utils/thread_unit_test_utils.c index ab49558..251bdc4 100644 --- a/vm/tests/unit/thread/utils/thread_unit_test_utils.c +++ b/vm/tests/unit/thread/utils/thread_unit_test_utils.c @@ -408,7 +408,7 @@ void tested_thread_wait_dead(tested_thread_sturct_t *tts) { int i; i = 0; - while (hythread_join_timed(tts->native_thread, MAX_TIME_TO_WAIT, 0) == TM_ERROR_TIMEOUT) { + while (join_thread(tts->native_thread, NULL, MAX_TIME_TO_WAIT, 0) == TM_ERROR_TIMEOUT) { i++; printf("Thread %i isn't dead after %i milliseconds", tts->my_index, (i * MAX_TIME_TO_WAIT)); @@ -481,3 +481,23 @@ void JNICALL default_run_for_test(jvmtiEnv * jvmti_env, JNIEnv * jni_env, void * tested_thread_ended(tts); } +// safely join thread: +// a) grab the global thread lock +// b) make sure a passed thread is still attached to passed group +// and thus is alive +// c) call hythread_join() under global thread lock +int join_thread(hythread_t thread, hythread_group_t group, I_64 ms, UDATA ns) +{ + int r = 0; + hythread_iterator_t iterator = hythread_iterator_create(group); + while (hythread_iterator_has_next(iterator)) { + hythread_t t = hythread_iterator_next(&iterator); + if (t == thread) { + r = hythread_join_timed(thread, ms, ns); + // we must not use the same iterator after hythread_join() + break; + } + } + hythread_iterator_release(&iterator); + return r; +} diff --git a/vm/tests/unit/thread/utils/thread_unit_test_utils.h b/vm/tests/unit/thread/utils/thread_unit_test_utils.h index 9e9b818..8a841e7 100644 --- a/vm/tests/unit/thread/utils/thread_unit_test_utils.h +++ b/vm/tests/unit/thread/utils/thread_unit_test_utils.h @@ -122,3 +122,4 @@ jthread new_jobject_thread(JNIEnv * jni_env); jobject new_jobject_thread_death(JNIEnv * jni_env); jthread new_jobject(); void delete_jobject(jobject obj); +int join_thread(hythread_t thread, hythread_group_t group, I_64 ms, UDATA ns); diff --git a/vm/thread/src/thread_native_basic.c b/vm/thread/src/thread_native_basic.c index 9c836dd..e9f94d4 100644 --- a/vm/thread/src/thread_native_basic.c +++ b/vm/thread/src/thread_native_basic.c @@ -282,38 +282,120 @@ void VMCALL hythread_detach(hythread_t thread) { assert(status == TM_ERROR_NONE); } +// joining a thread is tricky, as the thread will attempt to +// recycle its thread block as soon as its terminates, possibly +// even simultaneously with us running join(). +// +// The following algorithm makes this operation correct +// (1) the caller of join() function must make sure +// that a joined thread is still alive when join() is called. +// +// (a) by grabbing the global thread lock the caller ensures +// that no threads are terminated or created as long as +// the global thread lock is hold +// +// (b) if the thread is found during thread group iteration, +// it means that the thread is still active, so is eligible +// for joining +// +// (c) join() must be called while still holding the global +// thread lock. +// +// (2) join() must not hold global thread lock for a long time +// +// (d) join() asserts that the passed in thread is indeed +// attached to some thread group, which means that it is +// still active (debug build only) +// +// (e) join() releases the global thread lock, in order to prevent +// system deadlocks, e.g. if other thread is going to do a garbage +// collection while we are waiting for a thread +// +// Here we have a race condition, as the thread can destroy +// its thread block before we get to wait on the join_event. +// +// (f) join() waits on a join_event, which is kept in the thread block +// +// (g) join() re-acquires global thread lock to return to caller +// in the same state as on join() entry. +// +static IDATA join_thread(hythread_t thread, I_64 millis, IDATA nanos, + IDATA interruptable) +{ + int r; + +#ifndef NDEBUG + // assert that the thread is still attached to some group + hythread_iterator_t iterator; + iterator = hythread_iterator_create(thread->group); + r = -1; // we haven't found thread in the group yet + while (hythread_iterator_has_next(iterator)) { + if (hythread_iterator_next(&iterator) == thread) { + r = 0; + break; + } + } + hythread_iterator_release(&iterator); + assert(r == 0 && "hythread_join() was passed a non-attached thread"); +#endif + + // release a global lock while we are waiting + r = hythread_global_unlock(); + assert(r == 0 && "hythread_join() must be called under thread lock"); + + // there is a race here, as a thread may destroy its thread block + // before we got to wait on a latch + + // wait for a thread + r = hylatch_wait_internal(thread->join_event, millis, nanos, interruptable); + + // NB after we waited for a thread, the thread block pointer + // is not guaranteed to be valid anymore, and the thread block + // memory is likely to be reclaimed, so must not be used. + + // re-acquire global thread lock + hythread_global_lock(); + return r; +} + /** * Waits until the selected thread finishes execution. * - * @param[in] t thread to join + * Must be called with global lock being held to prevent + * reclamation of the thread block while this function is working. + * Caller must ensure that the joined thread is still alive. + * + * @param[in] thread thread to join */ -IDATA VMCALL hythread_join(hythread_t t) { - return hylatch_wait(t->join_event); +IDATA VMCALL hythread_join(hythread_t thread) { + return join_thread(thread, 0, 0, WAIT_NONINTERRUPTABLE); } + + /** * Waits until the selected thread finishes with specific timeout. * - * @param[in] t a thread to wait for + * @param[in] thread a thread to wait for * @param[in] millis timeout in milliseconds to wait * @param[in] nanos timeout in nanoseconds to wait * @return TM_THREAD_TIMEOUT or 0 in case thread * was successfully joined. */ -IDATA VMCALL hythread_join_timed(hythread_t t, I_64 millis, IDATA nanos) { - return hylatch_wait_timed(t->join_event, millis, nanos); +IDATA VMCALL hythread_join_timed(hythread_t thread, I_64 millis, IDATA nanos) { + return join_thread(thread, millis, nanos, WAIT_NONINTERRUPTABLE); } /** * Waits until the selected thread finishes with specific timeout. * - * @param[in] t a thread to wait for + * @param[in] thread a thread to wait for * @param[in] millis timeout in milliseconds to wait * @param[in] nanos timeout in nanoseconds to wait * @return TM_THREAD_TIMEOUT or TM_THREAD_INTERRUPTED or 0 in case thread * was successfully joined. */ -IDATA VMCALL hythread_join_interruptable(hythread_t t, I_64 millis, IDATA nanos) { - return hylatch_wait_interruptable(t->join_event, millis, nanos); +IDATA VMCALL hythread_join_interruptable(hythread_t thread, I_64 millis, IDATA nanos) { + return join_thread(thread, millis, nanos, WAIT_INTERRUPTABLE); } /** @@ -720,28 +802,30 @@ static int VMAPICALL thread_start_proc(void *arg) { // Also, should it be executed under TM global lock? status = hythread_set_priority(thread, thread->priority); //assert(status == TM_ERROR_NONE);//now we down - fixme + hymutex_lock(&thread->mutex); thread->state |= TM_THREAD_STATE_RUNNABLE; + hymutex_unlock(&thread->mutex); // Do actual call of the thread body supplied by the user. start_proc(data); - // Shutdown sequence. - status = hythread_global_lock(NULL); - assert(status == TM_ERROR_NONE); + // Shutdown sequence. -------------- assert(hythread_is_suspend_enabled()); - thread->state = TM_THREAD_STATE_TERMINATED | (TM_THREAD_STATE_INTERRUPTED & thread->state); - thread->exit_value = 0; + // removes thread from thread list under global thread lock hythread_detach(thread); // Send join event to those threads who called join on this thread. hylatch_count_down(thread->join_event); + hymutex_lock(&thread->mutex); + thread->state = TM_THREAD_STATE_TERMINATED | (TM_THREAD_STATE_INTERRUPTED & thread->state); + hymutex_unlock(&thread->mutex); + + thread->exit_value = 0; + free_thread(thread); - status = hythread_global_unlock(NULL); - assert(status == TM_ERROR_NONE); - return 0; } diff --git a/vm/thread/src/thread_native_latch.c b/vm/thread/src/thread_native_latch.c index fdcfeab..7a31f00 100644 --- a/vm/thread/src/thread_native_latch.c +++ b/vm/thread/src/thread_native_latch.c @@ -64,7 +64,7 @@ cleanup: //wait method implementation //// -static IDATA latch_wait_impl(hylatch_t latch, I_64 ms, IDATA nano, IDATA interruptable) { +IDATA hylatch_wait_internal(hylatch_t latch, I_64 ms, IDATA nano, IDATA interruptable) { IDATA status; status = hymutex_lock(&latch->mutex); @@ -94,7 +94,7 @@ static IDATA latch_wait_impl(hylatch_t latch, I_64 ms, IDATA nano, IDATA interru * TM_NO_ERROR on success */ IDATA VMCALL hylatch_wait(hylatch_t latch) { - return latch_wait_impl(latch, 0, 0, WAIT_NONINTERRUPTABLE); + return hylatch_wait_internal(latch, 0, 0, WAIT_NONINTERRUPTABLE); } /** * Instructs the current thread to wait until the latch is opened or @@ -108,7 +108,7 @@ IDATA VMCALL hylatch_wait(hylatch_t latch) { * TM_NO_ERROR on success */ IDATA VMCALL hylatch_wait_timed(hylatch_t latch, I_64 ms, IDATA nano) { - return latch_wait_impl(latch, ms, nano, WAIT_NONINTERRUPTABLE); + return hylatch_wait_internal(latch, ms, nano, WAIT_NONINTERRUPTABLE); } /** @@ -124,7 +124,7 @@ IDATA VMCALL hylatch_wait_timed(hylatch_t latch, I_64 ms, IDATA nano) { * TM_THREAD_INTERRUPTED in case thread was interrupted during wait. */ IDATA VMCALL hylatch_wait_interruptable(hylatch_t latch, I_64 ms, IDATA nano) { - return latch_wait_impl(latch, ms, nano, WAIT_INTERRUPTABLE); + return hylatch_wait_internal(latch, ms, nano, WAIT_INTERRUPTABLE); } /** diff --git a/vm/thread/src/thread_private.h b/vm/thread/src/thread_private.h index d1313a4..6125d00 100644 --- a/vm/thread/src/thread_private.h +++ b/vm/thread/src/thread_private.h @@ -611,6 +611,9 @@ UDATA array_add(array_t array, void *value); void *array_delete(array_t array, UDATA index); void *array_get(array_t array, UDATA index); +IDATA hylatch_wait_internal(hylatch_t latch, I_64 ms, IDATA nano, + IDATA interruptable); + /** * Auxiliary function to update thread count */ diff --git a/vm/vmcore/src/init/vm_shutdown.cpp b/vm/vmcore/src/init/vm_shutdown.cpp index d16c9d1..f41c4cb 100644 --- a/vm/vmcore/src/init/vm_shutdown.cpp +++ b/vm/vmcore/src/init/vm_shutdown.cpp @@ -95,7 +95,6 @@ static void vm_shutdown_callback() { */ static void vm_shutdown_stop_java_threads(Global_Env * vm_env) { hythread_t self; - hythread_t * running_threads; hythread_t native_thread; hythread_iterator_t it; VM_thread *vm_thread; @@ -106,25 +105,41 @@ static void vm_shutdown_stop_java_threads(Global_Env * vm_env) { // Set callbacks to let threads exit TRACE2("shutdown", "stopping threads, self " << self); it = hythread_iterator_create(NULL); - running_threads = (hythread_t *)apr_palloc(vm_env->mem_pool, - hythread_iterator_size(it) * sizeof(hythread_t)); - int size = 0; while(native_thread = hythread_iterator_next(&it)) { vm_thread = get_vm_thread(native_thread); if (native_thread != self && vm_thread != NULL) { + TRACE2("shutdown", "setting shutdown callback for " << native_thread); hythread_set_safepoint_callback(native_thread, vm_shutdown_callback); - running_threads[size] = native_thread; - ++size; } } hythread_iterator_release(&it); TRACE2("shutdown", "joining threads"); - // join running threads - // blocked and waiting threads won't be joined - for (int i = 0; i < size; i++) { - hythread_join_timed(running_threads[i], 100/size + 1, 0); + // Join running threads. Blocked and waiting threads won't be joined. + // Holding a thread lock while joining threads has following consequences: + // * a thread attempting to terminate will block on a thread lock + // in a thread termination sequence + // * a new thread creation attempt will block on a thread lock + // + // FIXME: Handle the above cases + // FIXME: Somehow limit the maximum total waiting time + int retries = 5; + while (--retries > 0) { + it = hythread_iterator_create(NULL); + + // the only remaining thread is current, + // no need to wait for anybody else + if (hythread_iterator_size(it) <= 1) break; + + while ((native_thread = hythread_iterator_next(&it)) + && native_thread != self) { + TRACE2("shutdown", "joining " << native_thread); + hythread_join_timed(native_thread, 10, 0); + // we should restart iteration after join attempt + break; + } + hythread_iterator_release(&it); } TRACE2("shutdown", "cancelling threads"); -- 1.4.1.g4455