// // mp::buffer // // Copyright (C) 2008 FURUHASHI Sadayuki // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // #ifndef MP_BUFFER_H__ #define MP_BUFFER_H__ #include #include #include "mp/event.h" #include "mp/utility.h" #include "mp/mempool.h" #include "mp/byte_array.h" #include #ifndef MP_BUFFER_MINIMUM_READ_BUFFER_SIZE #define MP_BUFFER_MINIMUM_READ_BUFFER_SIZE 1024 #endif #ifndef MP_BUFFER_ESTIMATED_OBJECT_SIZE #define MP_BUFFER_ESTIMATED_OBJECT_SIZE 1024 #endif #ifndef MP_BUFFER_OPTIMAL_CLIENT_NUMBER #define MP_BUFFER_OPTIMAL_CLIENT_NUMBER 128 #endif namespace mp { static const size_t BUFFER_MINIMUM_READ_BUFFER_SIZE = MP_BUFFER_MINIMUM_READ_BUFFER_SIZE; static const size_t BUFFER_ESTIMATED_OBJECT_SIZE = MP_BUFFER_ESTIMATED_OBJECT_SIZE; static const size_t BUFFER_OPTIMAL_CLIENT_NUMBER = MP_BUFFER_OPTIMAL_CLIENT_NUMBER; template class buffer { public: static void initialize(); static void destroy(); ~buffer(); typedef buffer this_t; typedef byte_array > stream; private: struct handler { virtual ~handler() {} }; public: struct stream_handler : public handler { public: stream_handler(int fd) : m_fd(fd) { } virtual ~stream_handler() { ::close(m_fd); } //! This callback function will be called when data reached. virtual void receive_data(stream& s, size_t len) = 0; public: void send_data(const void* buf, size_t len) { this_t::instance().send_data(m_fd, buf, len); } void send_data(const void* begin, const void* end) { this_t::instance().send_data(m_fd, begin, end); } void send_data(stream& buffer) { this_t::instance().send_data(m_fd, buffer); } void close_connection() { this_t::instance().close_connection(m_fd); } void close_connection_after_writing() { this_t::instance().close_connection_after_writing(m_fd); } void shutdown_read() { this_t::instance().shutdown_read(m_fd); } void shutdown_write() { this_t::instance().shutdown_write(m_fd); } void shutdown_write_after_writing() { this_t::instance().shutdown_write_after_writing(m_fd); } int getpeername(struct sockaddr *addr, socklen_t *addrlen) { return ::getpeername(m_fd, addr, addrlen); } private: const int m_fd; private: stream_handler(); }; struct event_handler : public handler { virtual ~event_handler() {} virtual void operator() (int fd, short event) = 0; }; public: //! Add event handler. /* The event handler will be called when the event is satisfied. * Not that the EventHandler class must be a subclass of struct event_handler. */ template static int add_event(int fd, short event); MP_ARGS_BEGIN template static int add_event(int fd, short event, MP_ARGS_PARAMS); MP_ARGS_END //! Add stream handler. /* The stream handler will be called when at least 1byte of data is received. * Not that the StreamHandler class must be a subclass of struct stream_handler. */ template static int add(int fd); MP_ARGS_BEGIN template static int add(int fd, MP_ARGS_PARAMS); MP_ARGS_END template static int add_connect(int nonblocking_socket, struct sockaddr* addr, socklen_t addrlen ); MP_ARGS_BEGIN template static int add_connect(int nonblocking_socket, struct sockaddr* addr, socklen_t addrlen, MP_ARGS_PARAMS ); MP_ARGS_END static stream* allocate_stream(); static void destroy_stream(stream* s); //! Start event loop. static int run() { return instance().run_impl(); } //! Stop event loop. /*! This function is async-signal-safe. */ static void end() { instance().end_impl(); } private: int add_impl(int fd, short event, event_handler* obj); int add_impl(int fd, stream_handler* obj); int run_impl(); void end_impl(); void send_data(int fd, const void* buf, size_t len); void send_data(int fd, const void* begin, const void* end); void send_data(int fd, stream& buffer); void close_connection(int fd); void close_connection_after_writing(int fd); void shutdown_read(int fd); void shutdown_write(int fd); void shutdown_write_after_writing(int fd); private: class cb_t { public: cb_t(event_handler* obj, short event); cb_t(stream_handler* obj); ~cb_t(); public: event_handler& event_object(); stream_handler& stream_object(); public: short event() const; void event(short new_event); bool is_event_handler() const; stream& instream(); stream& outqueue(); enum { SCHED_CONTINUE = 0, SCHED_SHUTDOWN_WRITE, SCHED_CLOSE, } connection_schedule; private: handler* m_obj; short m_event; stream* m_instream; stream* m_outq; //std::queue m_outqueue; bool m_close_scheduled; private: cb_t(const cb_t&); }; typedef event event_t; event_t m_event; typedef mempool object_pool_t; static object_pool_t& object_pool(); static std::auto_ptr s_instance; static this_t& instance() { return *s_instance; } buffer(); private: volatile sig_atomic_t m_end_flag; private: void buffer_read(int fd, cb_t& cb); void buffer_write(int fd, cb_t& cb); void rswitch(int fd, cb_t& cb); void wswitch(int fd, cb_t& cb); void unbind_connection(int fd, cb_t& cb); int m_write_shortcut; private: buffer(const this_t&); }; } // namespace mp #include "mp/buffer_impl.h" #endif /* mp/buffer.h */