Index: src/test/cpp/helpers/threadtestcase.cpp =================================================================== --- src/test/cpp/helpers/threadtestcase.cpp (revision 0) +++ src/test/cpp/helpers/threadtestcase.cpp (revision 0) @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include "../insertwide.h" +#include "../logunit.h" +#include +#include + +using namespace log4cxx; +using namespace log4cxx::helpers; + + +/** + Unit test for Thread. + + */ +LOGUNIT_CLASS(ThreadTestCase) { + LOGUNIT_TEST_SUITE(ThreadTestCase); + LOGUNIT_TEST(testInterrupt); + LOGUNIT_TEST_SUITE_END(); + + public: + /** + * Start a thread that will wait for a minute, then interrupt it. + */ + void testInterrupt() { + Thread thread1; + bool interrupted = false; + thread1.run(sleep, &interrupted); + apr_sleep(100000); + apr_time_t start = apr_time_now(); + thread1.interrupt(); + thread1.join(); + LOGUNIT_ASSERT_EQUAL(true, interrupted); + apr_time_t elapsed = apr_time_now() - start; + LOGUNIT_ASSERT(elapsed < 1000000); + } + +private: + static void* LOG4CXX_THREAD_FUNC sleep(apr_thread_t* thread, void* data) { + try { + Thread::sleep(60000); + } catch(InterruptedException& ex) { + *(reinterpret_cast(data)) = true; + } + return NULL; + } + +}; + +LOGUNIT_TEST_SUITE_REGISTRATION(ThreadTestCase); + Index: src/test/cpp/Makefile.am =================================================================== --- src/test/cpp/Makefile.am (revision 654826) +++ src/test/cpp/Makefile.am (working copy) @@ -53,6 +53,7 @@ helpers/stringtokenizertestcase.cpp \ helpers/stringhelpertestcase.cpp \ helpers/syslogwritertest.cpp \ + helpers/threadtestcase.cpp \ helpers/timezonetestcase.cpp \ helpers/transcodertestcase.cpp Index: src/main/cpp/socketappenderskeleton.cpp =================================================================== --- src/main/cpp/socketappenderskeleton.cpp (revision 654848) +++ src/main/cpp/socketappenderskeleton.cpp (working copy) @@ -155,6 +155,10 @@ } return NULL; } + catch(InterruptedException&) { + LogLog::debug(LOG4CXX_STR("Connector interrupted. Leaving loop.")); + return NULL; + } catch(ConnectException&) { LogLog::debug(LOG4CXX_STR("Remote host ") Index: src/main/cpp/synchronized.cpp =================================================================== --- src/main/cpp/synchronized.cpp (revision 654826) +++ src/main/cpp/synchronized.cpp (working copy) @@ -37,6 +37,18 @@ #endif } +synchronized::synchronized(apr_thread_mutex_t* mutex1) +: mutex(mutex1) +{ +#if APR_HAS_THREADS + apr_status_t stat = apr_thread_mutex_lock( + (apr_thread_mutex_t*) this->mutex); + if (stat != APR_SUCCESS) { + throw MutexException(stat); + } +#endif +} + synchronized::~synchronized() { #if APR_HAS_THREADS Index: src/main/cpp/threadcxx.cpp =================================================================== --- src/main/cpp/threadcxx.cpp (revision 654826) +++ src/main/cpp/threadcxx.cpp (working copy) @@ -22,6 +22,8 @@ #include #include #include +#include +#include using namespace log4cxx::helpers; using namespace log4cxx; @@ -135,7 +137,8 @@ -Thread::Thread() : thread(NULL), alive(0), interruptedStatus(0) { +Thread::Thread() : thread(NULL), alive(0), interruptedStatus(0), + interruptedMutex(NULL), interruptedCondition(NULL) { } Thread::~Thread() { @@ -159,6 +162,16 @@ throw ThreadException(stat); } + stat = apr_thread_cond_create(&interruptedCondition, p.getAPRPool()); + if (stat != APR_SUCCESS) { + throw ThreadException(stat); + } + stat = apr_thread_mutex_create(&interruptedMutex, APR_THREAD_MUTEX_NESTED, + p.getAPRPool()); + if (stat != APR_SUCCESS) { + throw ThreadException(stat); + } + // create LaunchPackage on the thread's memory pool LaunchPackage* package = new(p) LaunchPackage(this, start, data); stat = apr_thread_create(&thread, attrs, @@ -199,6 +212,13 @@ void Thread::interrupt() { apr_atomic_set32(&interruptedStatus, 0xFFFFFFFF); +#if APR_HAS_THREADS + if (interruptedMutex != NULL) { + synchronized sync(interruptedMutex); + apr_status_t stat = apr_thread_cond_signal(interruptedCondition); + if (stat != APR_SUCCESS) throw ThreadException(stat); + } +#endif } bool Thread::interrupted() { @@ -234,8 +254,25 @@ if(interrupted()) { throw InterruptedException(); } -#endif if (duration > 0) { + Thread* pThis = (Thread*) getThreadLocal().get(); + if (pThis == NULL) { + apr_sleep(duration*1000); + } else { + synchronized sync(pThis->interruptedMutex); + apr_status_t stat = apr_thread_cond_timedwait(pThis->interruptedCondition, + pThis->interruptedMutex, duration*1000); + if (stat != APR_SUCCESS && !APR_STATUS_IS_TIMEUP(stat)) { + throw ThreadException(stat); + } + if (interrupted()) { + throw InterruptedException(); + } + } + } +#else + if (duration > 0) { apr_sleep(duration*1000); } +#endif } Index: src/main/include/log4cxx/helpers/thread.h =================================================================== --- src/main/include/log4cxx/helpers/thread.h (revision 654826) +++ src/main/include/log4cxx/helpers/thread.h (working copy) @@ -31,6 +31,8 @@ extern "C" { typedef struct apr_thread_t apr_thread_t; + typedef struct apr_thread_cond_t apr_thread_cond_t; + typedef struct apr_thread_mutex_t apr_thread_mutex_t; } @@ -100,6 +102,8 @@ apr_thread_t* thread; volatile unsigned int alive; volatile unsigned int interruptedStatus; + apr_thread_mutex_t* interruptedMutex; + apr_thread_cond_t* interruptedCondition; Thread(const Thread&); Thread& operator=(const Thread&); friend void* LOG4CXX_THREAD_FUNC ThreadLaunch::launcher(apr_thread_t* thread, void* data); Index: src/main/include/log4cxx/helpers/synchronized.h =================================================================== --- src/main/include/log4cxx/helpers/synchronized.h (revision 654826) +++ src/main/include/log4cxx/helpers/synchronized.h (working copy) @@ -19,6 +19,10 @@ #define _LOG4CXX_HELPERS_SYNCHRONIZED_H #include +extern "C" { + typedef struct apr_thread_mutex_t apr_thread_mutex_t; +} + namespace log4cxx { namespace helpers { @@ -29,6 +33,7 @@ { public: synchronized(const Mutex& mutex); + synchronized(apr_thread_mutex_t* mutex); ~synchronized(); Index: src/main/include/log4cxx/net/socketappenderskeleton.h =================================================================== --- src/main/include/log4cxx/net/socketappenderskeleton.h (revision 654826) +++ src/main/include/log4cxx/net/socketappenderskeleton.h (working copy) @@ -162,7 +162,7 @@ reconnectionDelay milliseconds.

It stops trying whenever a connection is established. It will - restart to try reconnect to the server when previpously open + restart to try reconnect to the server when previously open connection is droppped. */