// // mp::buffer // // Copyright (C) 2008 FURUHASHI Sadayuki // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // #ifndef MP_BUFFER_IMPL_H__ #define MP_BUFFER_IMPL_H__ #include #include #include #ifdef MP_T_OBJPOOL #undef MP_T_OBJPOOL #endif #define MP_T_OBJPOOL static_cast(object_pool()) namespace mp { template std::auto_ptr< buffer > buffer::s_instance; template void buffer::initialize() { s_instance.reset(new buffer()); } template void buffer::destroy() { s_instance.reset(NULL); } template buffer::buffer() : m_end_flag(false), m_write_shortcut(-1) { } template buffer::~buffer() { } template template int buffer::add_event(int fd, short event) { event_handler* obj = MP_T_OBJPOOL.construct(); return instance().add_impl(fd, event, obj); } MP_ARGS_BEGIN template template int buffer::add_event(int fd, short event, MP_ARGS_PARAMS) { event_handler* obj = MP_T_OBJPOOL.construct(MP_ARGS_FUNC); return instance().add_impl(fd, event, obj); } MP_ARGS_END template template int buffer::add(int fd) { stream_handler* obj = MP_T_OBJPOOL.construct(fd); return instance().add_impl(fd, obj); } MP_ARGS_BEGIN template template int buffer::add(int fd, MP_ARGS_PARAMS) { stream_handler* obj = MP_T_OBJPOOL.construct(fd, MP_ARGS_FUNC); return instance().add_impl(fd, obj); } MP_ARGS_END template template int buffer::add_connect(int fd, struct sockaddr* addr, socklen_t addrlen) { stream_handler* obj = MP_T_OBJPOOL.construct(fd); return instance().add_impl(fd, obj); } MP_ARGS_BEGIN template template int buffer::add_connect(int fd, struct sockaddr* addr, socklen_t addrlen, MP_ARGS_PARAMS) { stream_handler* obj = MP_T_OBJPOOL.construct(fd, MP_ARGS_FUNC); return instance().add_impl(fd, obj); } MP_ARGS_END template int buffer::add_impl(int fd, short event, event_handler* obj) { return m_event.add(fd, event, obj, event); } template int buffer::add_impl(int fd, stream_handler* obj) { return m_event.add(fd, EV_READ, obj); } template typename buffer::stream* buffer::allocate_stream() { return MP_T_OBJPOOL.construct(); } template void buffer::destroy_stream(stream* s) { MP_T_OBJPOOL.destroy(s); } template buffer::cb_t::cb_t(event_handler* obj, short event) : connection_schedule(SCHED_CONTINUE), m_obj(obj), m_event(event), m_instream(NULL), m_outq(NULL) { } template buffer::cb_t::cb_t(stream_handler* obj) : connection_schedule(SCHED_CONTINUE), m_obj(obj), m_event(EV_READ), m_instream( MP_T_OBJPOOL.construct() ), m_outq( MP_T_OBJPOOL.construct() ) { } template buffer::cb_t::~cb_t() { MP_T_OBJPOOL.destroy(m_obj); if(m_instream) { MP_T_OBJPOOL.destroy(m_instream); } if(m_outq) { MP_T_OBJPOOL.destroy(m_outq); } } template typename buffer::event_handler& buffer::cb_t::event_object() { return *reinterpret_cast(m_obj); } template typename buffer::stream_handler& buffer::cb_t::stream_object() { return *reinterpret_cast(m_obj); } template short buffer::cb_t::event() const { return m_event; } template void buffer::cb_t::event(short new_event) { m_event = new_event; } template bool buffer::cb_t::is_event_handler() const { return m_instream == NULL; } template typename buffer::stream& buffer::cb_t::instream() { return *m_instream; } template typename buffer::stream& buffer::cb_t::outqueue() { return *m_outq; } template void buffer::end_impl() { m_end_flag = true; } template typename buffer::object_pool_t& buffer::object_pool() { static object_pool_t object; return object; } template int buffer::run_impl() { cb_t* pcb; int fd; short event; int ret; while(!m_end_flag) { while( m_event.next(&fd, &event, &pcb) ) { if(pcb->is_event_handler()) { pcb->event_object()(fd, event); } else { if( event & EV_READ ) { buffer_read(fd, *pcb); } if( m_write_shortcut == fd || event & EV_WRITE ) { buffer_write(fd, *pcb); m_write_shortcut = -1; } } } if( (ret = m_event.wait()) < 0 ) { return ret; } } return 0; } template void buffer::buffer_read(int fd, cb_t& cb) { stream& s(cb.instream()); s.reserve_space(1024); //std::cout << "space " << s.space() << std::endl; ssize_t len = ::read(fd, s.end(), s.space()); //std::cout << "buffer_read " << len << " " << strerror(errno) << std::endl; if(len <= 0) { if(len == 0) { unbind_connection(fd, cb); return; } else if(errno == EAGAIN || errno == EINTR) { return; } else { unbind_connection(fd, cb); return; } } s.produced(len); cb.stream_object().receive_data(s, len); } template void buffer::buffer_write(int fd, cb_t& cb) { stream& s(cb.outqueue()); // FIXME sendfile ssize_t len = ::write(fd, s.begin(), s.size()); if(len <= 0) { if(len == 0) { unbind_connection(fd, cb); return; } else if(errno == EAGAIN || errno == EINTR) { return; } else { unbind_connection(fd, cb); return; } } // FIXME vwrite s.consumed(len); rswitch(fd, cb); } template void buffer::send_data(int fd, const void* buf, size_t len) { cb_t& cb(m_event.data(fd)); stream& oq(cb.outqueue()); oq.reserve_space(len); ::memcpy(oq.end(), buf, len); oq.produced(len); wswitch(fd, cb); } template void buffer::send_data(int fd, const void* begin, const void* end) { send_data(fd, begin, static_cast(end) - static_cast(begin)); } template void buffer::send_data(int fd, stream& buffer) { cb_t& cb(m_event.data(fd)); stream& oq(cb.outqueue()); if(oq.empty()) { buffer.swap(oq); } else { oq.reserve_space(buffer.size()); ::memcpy(oq.end(), buffer.begin(), buffer.size()); oq.produced(buffer.size()); buffer.clear(); } wswitch(fd, cb); } template void buffer::close_connection(int fd) { cb_t& cb(m_event.data(fd)); m_event.remove(fd, cb.event()); } template void buffer::close_connection_after_writing(int fd) { cb_t& cb(m_event.data(fd)); cb.connection_schedule = cb_t::SCHED_CLOSE; } template void buffer::shutdown_read(int fd) { ::shutdown(fd, SHUT_RD); } template void buffer::shutdown_write(int fd) { ::shutdown(fd, SHUT_WR); rswitch(); } template void buffer::shutdown_write_after_writing(int fd) { cb_t& cb(m_event.data(fd)); cb.connection_schedule = cb_t::SCHED_SHUTDOWN_WRITE; } template void buffer::rswitch(int fd, cb_t& cb) { if( !cb.instream().empty() ) { return; } if( cb.connection_schedule != cb_t::SCHED_CONTINUE ) { if( cb.connection_schedule == cb_t::SCHED_SHUTDOWN_WRITE ) { ::shutdown(fd, SHUT_RD); } else { m_event.remove(fd, cb.event()); } } else { if( m_event.modify(fd, cb.event(), EV_READ) < 0 ) { unbind_connection(fd, cb); } cb.event(EV_READ); } } template void buffer::wswitch(int fd, cb_t& cb) { if( cb.event() & EV_WRITE ) { return; } if( m_event.modify(fd, cb.event(), EV_READ|EV_WRITE) < 0 ) { unbind_connection(fd, cb); } cb.event(EV_READ|EV_WRITE); m_write_shortcut = fd; } template void buffer::unbind_connection(int fd, cb_t& cb) { //if(!cb.is_event_handler()) { // cb.stream_object().unbind(); //} m_event.remove(fd, cb.event()); } } // namespace mp #undef MP_T_OBJPOOL #endif /* mp/buffer_impl.h */