// // mp::multiplex // // Copyright (C) 2008 FURUHASHI Sadayuki // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // #ifndef MP_MULTIPLEX_IMPL_H__ #define MP_MULTIPLEX_IMPL_H__ #include #include #include #include namespace mp { namespace multiplex { handler::handler(int fd) : m_fd(fd) {} handler::~handler() {} // FIXME std::auto_ptr manager::s_instance; void manager::initialize() { s_instance.reset(new manager()); } void manager::destroy() { s_instance.reset(NULL); } manager::manager() : m_zone(m_source), m_end_flag(0) { } manager::~manager() {} manager::cb_t::cb_t(handler* h, short ev) : m_handler(h), m_event(ev), m_wbuffer((char*)malloc(2048)), m_allocated(2048), m_used(0) { if(!m_wbuffer) { throw std::bad_alloc(); } } manager::cb_t::~cb_t() { free(m_wbuffer); } short manager::cb_t::event() const { return m_event; } void manager::cb_t::event(short ev) { m_event = ev; } handler& manager::cb_t::get() { return *m_handler; } // FIXME poor implementation void manager::cb_t::append_buffer(const char* buf, size_t len) { while(m_allocated - m_used < len) { size_t nsize = m_allocated*2; char* tmp = (char*)realloc(m_wbuffer, nsize); if(!tmp) { throw std::bad_alloc(); } m_wbuffer = tmp; m_allocated = nsize; } ::memcpy(m_wbuffer+m_used, buf, len); m_used += len; } void manager::cb_t::consume_buffer(size_t len) { if(len == m_used) { m_used = 0; } else { ::memmove(m_wbuffer, m_wbuffer+len, m_used-len); } } const char* manager::cb_t::wbuffer() const { return m_wbuffer; } size_t manager::cb_t::length() const { return m_used; } bool manager::cb_t::empty() const { return m_used == 0; } template void manager::add(int fd) { instance().add_impl(fd); } MP_ARGS_BEGIN template void manager::add(int fd, MP_ARGS_PARAMS) { instance().add_impl(fd, MP_ARGS_FUNC); } MP_ARGS_END template void manager::add_impl(int fd) { // FIXME sparse_array handler* h = m_zone.allocate(fd); m_ev.add(fd, EV_READ, h, EV_READ); h->connected(); } MP_ARGS_BEGIN template void manager::add_impl(int fd, MP_ARGS_PARAMS) { // FIXME sparse_array handler* h = m_zone.allocate(fd, MP_ARGS_FUNC); m_ev.add(fd, EV_READ, h, EV_READ); h->connected(); } MP_ARGS_END handler& manager::data(int fd) { return instance().data_impl(fd); } handler& manager::data_impl(int fd) { return m_ev.data(fd).get(); } void manager::send_data(int fd, const char* buf, size_t len) { instance().send_data_impl(fd, buf, len); } void manager::send_data_impl(int fd, const char* buf, size_t len) { cb_t& cb( m_ev.data(fd) ); cb.append_buffer(buf, len); m_wswitch_ctx.push_back(&cb); } void manager::end() { instance().m_end_flag = 1; } bool manager::is_end() { return instance().m_end_flag == 1; } int manager::run() { return instance().run_impl(); } int manager::run_impl() { cb_t* pcb; int fd; short event; int ret; while(!m_end_flag) { while( m_ev.next(&fd, &event, &pcb) ) { if( event & EV_READ ) { try_read(*pcb); } if( event & EV_WRITE ) { try_write(*pcb); } } for(wswitch_ctx_t::iterator it(m_wswitch_ctx.begin()), it_end(m_wswitch_ctx.end()); it != it_end; ++it) { if( try_write(*pcb) ) { wswitch(*pcb); } } m_wswitch_ctx.clear(); if( (ret = m_ev.wait()) < 0 ) { return ret; } } return 0; } void manager::try_read(cb_t& cb) { try { cb.get().read_event(); } catch (...) { // FIXME log close_connection(cb); } } bool manager::try_write(cb_t& cb) { if(cb.length() == 0) { return false; } ssize_t len = ::write(cb.get().fd(), cb.wbuffer(), cb.length()); if(len < 0) { if(errno == EAGAIN || errno == EINTR) { return true; } else { close_connection(cb); return false; } } else if(len == 0) { close_connection(cb); return false; } cb.consume_buffer(len); if(cb.empty()) { rswitch(cb); return false; } else { return true; } } void manager::rswitch(cb_t& cb) { if( cb.event() & EV_WRITE != 0 ) { m_ev.modify(cb.get().fd(), EV_READ|EV_WRITE, EV_READ); cb.event(EV_READ); } } void manager::wswitch(cb_t& cb) { if( cb.event() & EV_WRITE == 0 ) { m_ev.modify(cb.get().fd(), EV_READ, EV_READ|EV_WRITE); cb.event(EV_READ|EV_WRITE); } } void manager::close_connection(cb_t& cb) { int fd = cb.get().fd(); // FIXME remove handler form sparse_array m_ev.remove(fd, cb.event()); ::close(fd); } // FIXME // static void manager::send_message(int fd, callback_message* msg) // { // } // } // namespace multiplex } // namespace mp #endif /* mp/multiplex_impl.h */