// // mp::async // // Copyright (C) 2008 FURUHASHI Sadayuki // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // #ifndef MP_ASYNC_IMPL_H__ #define MP_ASYNC_IMPL_H__ #include "mp/async.h" #include "mp/object_callback.h" #include #include #include #include namespace mp { async::async(unsigned int num_threads) : m_end_flag(0) { if( fcntl(m_event_pipe.readfd(), F_SETFL, O_NONBLOCK) < 0 ) { throw std::runtime_error("can't set nonblock"); } m_threads.resize(num_threads); for(threads_type::iterator it(m_threads.begin()), it_end(m_threads.end()); it != it_end; ++it) { if( pthread_create(&*it, NULL, &async::thread_proc, this) != 0 ) { m_end_flag = true; throw std::runtime_error("can't create thread"); } } // FIXME } async::~async() { m_end_flag = true; size_type notify_buffer[MAX_SLOT]; memset(notify_buffer, 0, sizeof(notify_buffer)); const char* p = (const char*)notify_buffer; const char* const endp = p + sizeof(notify_buffer); do { const ssize_t n = ::write(m_thread_pipe.writefd(), p, endp - p); if(n <= 0) { return; } // FIXME p += n; } while (p< endp); // FIXME join? } async::pipe_pair::pipe_pair() { if(pipe(m) < 0) { // FIXME throw std::runtime_error("can't create a pair of pipe"); } } async::pipe_pair::~pipe_pair() { ::close(m[0]); ::close(m[1]); } template void async::add(void (*callback)(void* user, Handler* target, state_t state), void* user) { add_impl(callback, user, new Handler()); } MP_ARGS_BEGIN template void async::add(void (*callback)(void* user, Handler* terget, state_t state), void* user, MP_ARGS_PARAMS) { add_impl(callback, user, new Handler(MP_ARGS_FUNC)); } MP_ARGS_END template void async::add_impl(void (*callback)(void*, Handler*, state_t), void* callback_object, Handler* obj) { for(slot_t *it(m_slots+1), *it_end(m_slots+MAX_SLOT); it != it_end; // id == 0 means ping ++it) { if(it->empty()) { it->set(callback, callback_object, obj); size_type id = it - m_slots; if(::write(m_thread_pipe.writefd(), &id, 1) < 0) { throw std::runtime_error("pipe borken"); } return; } } delete obj; } async::slot_t::slot_t() : m_handler(NULL) { } async::slot_t::~slot_t() { clear(); } template void async::slot_t::set(void (*callback)(void*, Handler*, state_t), void* callback_object, Handler* target) { m_callback = reinterpret_cast(callback); m_callback_object = reinterpret_cast(callback_object); m_handler = target; m_finalizer = &slot_t::destructor_caller; m_operator = &object_callback::mem_fun; } void async::slot_t::clear() { if(m_handler) { (*m_finalizer)(m_handler); m_handler = NULL; } } template void async::slot_t::destructor_caller(void *obj) { delete reinterpret_cast(obj); } void async::slot_t::call(accessor& a) { (*m_operator)(m_handler, a); } bool async::slot_t::callback() { state_t st_save = state; (*m_callback)(m_callback_object, m_handler, st_save); return st_save == STATE_RUNNING; } void* async::thread_proc(void* obj) { reinterpret_cast(obj)->thread_proc_impl(); return NULL; } inline void async::thread_proc_impl() { size_type id; while(!m_end_flag) { ::read(m_thread_pipe.readfd(), &id, 1); if(id == 0) { continue; } // id == 0 means ping slot_t& slot( m_slots[id] ); accessor a(*this, id, slot); slot.state = STATE_RUNNING; try { slot.call(a); } catch (...) { slot.state = STATE_FAILED; a.callback(); continue; } slot.state = STATE_EXITED; a.callback(); } } async::accessor::accessor(async& self, size_type id, slot_t& slot) : m_self(self), m_id(id), m_slot(slot) { } async::accessor::~accessor() { } void async::accessor::callback() { if( ::write(m_self.m_event_pipe.writefd(), &m_id, 1) < 0 ) { // FIXME //throw std::runtime_error("write failed"); } } void async::process() { size_type buf[128]; ssize_t n = ::read(m_event_pipe.readfd(), buf, sizeof(buf)); if(n < 0) { if(errno == EAGAIN || errno == EINTR) { return; } else { throw std::runtime_error("pipe broken"); } } else if(n == 0) { throw std::runtime_error("pipe broken"); } for(unsigned int i=0; i < static_cast(n); ++i) { size_type id = buf[i]; slot_t& slot( m_slots[id] ); if( !slot.callback() ) { m_slots[id].clear(); } // FIXME 途中 } } int async::getfd() { return m_event_pipe.readfd(); } } // namespace mp #include "mp/async_impl.h" #endif /* mp/async.h */