// // mp::async // // Copyright (C) 2008 FURUHASHI Sadayuki // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // #ifndef MP_ASYNC_H__ #define MP_ASYNC_H__ #include #include namespace mp { class async { public: typedef uint8_t size_type; typedef enum { STATE_RUNNING, STATE_FAILED, STATE_EXITED, } state_t; private: class slot_t; public: class accessor { public: accessor(async& self, size_type id, slot_t& slot); ~accessor(); public: void callback(); private: async& m_self; size_type m_id; slot_t& m_slot; }; class handler { public: handler() { } virtual ~handler() { } virtual void operator() (accessor& a) = 0; public: bool exited() const { return m_state == STATE_EXITED || failed(); } bool failed() const { return m_state == STATE_FAILED; } private: state_t m_state; }; public: async(unsigned int num_threads = 5); ~async(); public: template void add(void (*callback)(void* user, Handler* target, state_t state), void* user); MP_ARGS_BEGIN template void add(void (*callback)(void* user, Handler* terget, state_t state), void* user, MP_ARGS_PARAMS); MP_ARGS_END void process(); int getfd(); private: class pipe_pair { public: pipe_pair(); ~pipe_pair(); public: int readfd() const { return m[0]; } int writefd() const { return m[1]; } private: int m[2]; private: pipe_pair(const pipe_pair&); }; pipe_pair m_thread_pipe; pipe_pair m_event_pipe; typedef std::vector threads_type; threads_type m_threads; class slot_t { public: slot_t(); ~slot_t(); public: bool empty() const { return m_handler == NULL; } template void set(void (*callback)(void*, Handler*, state_t), void* callback_object, Handler* target); void clear(); void call(accessor& a); bool callback(); private: void* m_callback_object; void (*m_callback)(void*, void*, state_t); void* m_handler; void (*m_operator)(void*, accessor&); void (*m_finalizer)(void*); public: volatile state_t state; private: template static void destructor_caller(void* obj); private: slot_t(const slot_t&); }; static const size_t MAX_SLOT = 256; slot_t m_slots[256]; private: static void* thread_proc(void* obj); void thread_proc_impl(); private: template void add_impl(void (*callback)(void*, Handler*, state_t), void* callback_object, Handler* obj); private: volatile sig_atomic_t m_end_flag; }; } // namespace mp #include "mp/async_impl.h" #endif /* mp/async.h */