本文总结自《Linux高性能服务器编程》和张亮写的《Libevent源码深度剖析》及其博客
需要注意的是,张亮的pdf里基于的是低版本的Libevent源代码,有一些细节跟更新后的版本有出入,但是大体思路都一致。
Reator:反应堆”,是一种事件驱动机制。
应用程序提供相应的接口(“回调函数”)并注册到Reactor上,当相应的事件发生时,Libevent调用这些回调函数处理相应的事件(I/O读写、定时和信号)。
事件源:I/O框架要处理的对象(I/O读写、定时和信号事件)
一个事件通常和一个句柄在一起I/O ---------- 文件描述符信号 ---------- 信号值
Event Demultiplexer –事件多路分发机制:
程序首先将关心的句柄注册到event demultiplexer上。
内部调用I/O多路复用机制(select、epoll)
Reactor – 反应器
I/O框架库的核心,提供主要方法:
handler_events: 执行事件循环。等待事件,然后依次处理所有就绪事件对应的事件处理器
register_handler: 调用Event Demultiplexer的register_handler()注册一事件
remove_handler:调用Event Demultiplexer的remove_handler()删除事件多路分发器中的一个事件
Event Handler –事件处理器
I/O框架库提供的事件处理器通常是一个接口,用户继承他来实现自己的事件处理器,即具体事件处理器(Concrete Event Handler)
get_handler() :返回与该事件处理器关联的句柄。
事件处理分发器检测到有事件发生时,通过句柄通知应用程序。
Reactor事件处理流程
void signal_cb(int fd, short event, void *argc)
{struct event_base *base = (event_base *)argc;struct timeval delay ={2, 0};/*do something*/event_base_loopexit(base, &delay);
}void timeout_cb(int fd, short event, void *argc)
{/*do something*/
}int main()
{/*event_init 创建event_base对象,一个event_base相当于一个Reactor实例*/struct event_base *base = event_init(); /*evsiganl_new:信号处理器;
evtimer_new:定时时间处理器
创建具体的事件处理器,并设置他们所从属的Reactor实例*/struct event* signal_event = evsiganl_new(base, SIGINT, signal_cb, base);/*event_add:将事件处理器添加到注册事件队列中,并将该事件处理器对应的事件添加到事件多路分发器,相当于Reactor中的register_handler方法*/event_add(signal_event, NULL);timeval tv = {1, 0};struct event *timeout_event = evtimer_new(base, timeout_cb, NULL);event_add(timeout_event, NULL);event_base_dispatch(base); //执行事件循环/*释放系统资源*/event_free(timeout_event);event_free(signal_event);event_base_free(base);
}
从下边代码可以看出,evsiganl_new、evtimer_new其实质都是调用event_new()函数
#define evsignal_new(b, x, cb, arg) event_new((b), (x), EV_SIGNAL|EV_PERSIST, (cb), (arg))#define evtimer_new(b, cb, arg) event_new((b), -1, 0, (cb), (arg))struct event *
event_new(struct event_base *base, evutil_socket_t fd, short events, void (*cb)(evutil_socket_t, short, void *), void *arg)
event_new的各参数:
base:新创建的事件处理器从属的Reactor
fd:与该事件处理器关联的句柄
I/O事件处理器:fd为文件描述符值
信号事件处理器:fd为信号值
定时事件处理器:fd为-1
events:事件类型
/** Indicates that a timeout has occurred. It's not necessary to pass* this flag to event_for new()/event_assign() to get a timeout. */
#define EV_TIMEOUT 0x01 //定时事件
/** Wait for a socket or FD to become readable */
#define EV_READ 0x02 //可读事件
/** Wait for a socket or FD to become writeable */
#define EV_WRITE 0x04 //可写事件
/** Wait for a POSIX signal to be raised*/
#define EV_SIGNAL 0x08 //信号事件
/*** Persistent event: won't get removed automatically when activated.** When a persistent event with a timeout becomes activated, its timeout* is reset to 0.*/
#define EV_PERSIST 0x10 //永久事件,事件被触发后,自动重新对这个event调用event_add函数
/** Select edge-triggered behavior, if supported by the backend. */
#define EV_ET 0x20 //边沿触发事件
cb:目标事件对应的回调函数
arg:Reactor传递给回调函数的参数
返回一event类型的对象(即Libevent的事件处理器)
struct event {/*被激活的事件处理器->活动事件队列(有多个,优先级不同)*/TAILQ_ENTRY(event) ev_active_next;/*已注册的事件处理器->注册事件队列*/TAILQ_ENTRY(event) ev_next;/* for managing timeouts */union {TAILQ_ENTRY(event) ev_next_with_common_timeout; //标识在队列中的位置int min_heap_idx;} ev_timeout_pos; //定时器队列evutil_socket_t ev_fd; //fd或信号值struct event_base *ev_base;//该事件处理器从属的event_base实例union {/* used for io events */struct {TAILQ_ENTRY(event) ev_io_next;//有相同fd的I/O事件处理器,串在一队列中//这样,当一个fd上有事件发生时,时间多路分发器能很快把所有相关的事件处理器添加到活动事件队列中struct timeval ev_timeout;} ev_io;/* used by signal events */struct {TAILQ_ENTRY(event) ev_signal_next;//有相同信号值的信号事件处理器,串在一队列中short ev_ncalls; //事件发生时,执行多少次该事件对应的回调函数/* Allows deletes in callback */short *ev_pncalls;} ev_signal;} _ev;short ev_events; //事件类型short ev_res; //记录当前激活事件的类型short ev_flags; //事件标识:/*#define EVLIST_TIMEOUT 0x01 // event在time堆中#define EVLIST_INSERTED 0x02 // event在已注册事件链表中#define EVLIST_SIGNAL 0x04 // 未见使用#define EVLIST_ACTIVE 0x08 // event在激活链表中#define EVLIST_INTERNAL 0x10 // 内部使用标记#define EVLIST_INIT 0x80 // event已被初始化*/ev_uint8_t ev_pri; //事件处理器的优先级,越小越高ev_uint8_t ev_closure;struct timeval ev_timeout; //定时器超时值/* allows us to adopt for different types of events */void (*ev_callback)(evutil_socket_t, short, void *arg);void *ev_arg;
};
每次当有事件event转变为就绪状态时,libevent就会把它移入到active event list[priority]中,其中priority是event的优先级;
接着libevent会根据自己的调度策略选择就绪事件,调用其cb_callback()函数执行事件处理;并根据就绪的句柄和事件类型填充cb_callback函数的参数。
void event_set(struct event *ev, int fd, short events,void (*callback)(int, short, void *), void *arg);//默认情况下事件ev将被注册到current_base(一个全局event_base指针)上
int event_base_set(struct event_base *base, struct event *ev);int event_priority_set(struct event *ev, int pri);
event_base结构体:Libevent的Reactor
struct event_base {/** Function pointers and other data to describe this event_base's* backend. *///从eventops[]中选择一种后端I/O复用机制,eventop 结构体见下:const struct eventop *evsel;/** Pointer to backend-specific data. *///I/O复用机制真正存储的数据void *evbase;/** List of changes to tell backend about at next dispatch. Only used* by the O(1) backends. */struct event_changelist changelist;/** Function pointers used to describe the backend that this event_base* uses for signals */const struct eventop *evsigsel;/** Data to implement the common signal handelr code. *///信号事件处理器使用的数据结构struct evsig_info sig;/** Number of virtual events */int virtual_event_count;/** Number of total events added to this event_base */int event_count;/** Number of total events active in this event_base */int event_count_active;/** Set if we should terminate the loop once we're done processing* events. */int event_gotterm;/** Set if we should terminate the loop immediately */int event_break;/** Set if we should start a new instance of the loop immediately. */int event_continue;/** The currently running priority of events */int event_running_priority;/** Set if we're running the event_base_loop function, to prevent* reentrant invocation. */int running_loop;/* Active event management. *//** An array of nactivequeues queues for active events (ones that* have triggered, and whose callbacks need to be called). Low* priority numbers are more important, and stall higher ones.*///活动事件队列数组,activequeues[priority]:一个链表(activequeues为二级指针),链表每个节点指向优先级为priority的就绪事件eventstruct event_list *activequeues;/** The length of the activequeues array */int nactivequeues;/* common timeout logic *//** An array of common_timeout_list* for all of the common timeout* values we know. */struct common_timeout_list **common_timeout_queues;/** The number of entries used in common_timeout_queues */int n_common_timeouts;/** The total size of common_timeout_queues. */int n_common_timeouts_allocated;/** List of defered_cb that are active. We run these after the active* events. */*存放延迟函数的链表,事件循环每次成功处理完一个活动事件队列中的所有事件之后,就调用一次延迟回调函数*/struct deferred_cb_queue defer_queue;/** Mapping from file descriptors to enabled (added) events *///fd和I/O的映射关系表struct event_io_map io;/** Mapping from signal numbers to enabled (added) events. */struct event_signal_map sigmap;/** All events that have been enabled (added) in this event_base *///注册事件队列,存放I/O事件处理器和信号事件处理器struct event_list eventqueue;/** Stored timeval; used to detect when time is running backwards. */struct timeval event_tv;/** Priority queue of events with timeouts. */struct min_heap timeheap;/** Stored timeval: used to avoid calling gettimeofday/clock_gettime* too often. */struct timeval tv_cache;#if defined(_EVENT_HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)/** Difference between internal time (maybe from clock_gettime) and* gettimeofday. */struct timeval tv_clock_diff;/** Second in which we last updated tv_clock_diff, in monotonic time. */time_t last_updated_clock_diff;
#endif#ifndef _EVENT_DISABLE_THREAD_SUPPORT/* threading support *//** The thread currently running the event_loop for this base */unsigned long th_owner_id;/** A lock to prevent conflicting accesses to this event_base */void *th_base_lock;/** The event whose callback is executing right now */struct event *current_event;/** A condition that gets signalled when we're done processing an* event with waiters on it. */void *current_event_cond;/** Number of threads blocking on current_event_cond. */int current_event_waiters;
#endif#ifdef WIN32/** IOCP support structure, if IOCP is enabled. */struct event_iocp_port *iocp;
#endif/** Flags that this base was configured with */enum event_base_config_flag flags;/* Notify main thread to wake up break, etc. *//** True if the base already has a pending notify, and we don't need* to add any more. */int is_notify_pending;/** A socketpair used by some th_notify functions to wake up the main* thread. */evutil_socket_t th_notify_fd[2];/** An event used by some th_notify functions to wake up the main* thread. */struct event th_notify;/** A function used to wake up the main thread from another thread. */int (*th_notify_fn)(struct event_base *base);
};
eventop 结构体:封装了I/O复用机制必要的一些操作
struct eventop {/** The name of this backend. */const char *name;/** Function to set up an event_base to use this backend. It should* create a new structure holding whatever information is needed to* run the backend, and return it. The returned pointer will get* stored by event_init into the event_base.evbase field. On failure,* this function should return NULL. * 初始化函数*/void *(*init)(struct event_base *);/** Enable reading/writing on a given fd or signal. 'events' will be* the events that we're trying to enable: one or more of EV_READ,* EV_WRITE, EV_SIGNAL, and EV_ET. 'old' will be those events that* were enabled on this fd previously. 'fdinfo' will be a structure* associated with the fd by the evmap; its size is defined by the* fdinfo field below. It will be set to 0 the first time the fd is* added. The function should return 0 on success and -1 on error.* 注册事件*/int (*add)(struct event_base *, evutil_socket_t fd, short old, short events, void *fdinfo);/** As "add", except 'events' contains the events we mean to disable. 删除事件*/ int (*del)(struct event_base *, evutil_socket_t fd, short old, short events, void *fdinfo);/** Function to implement the core of an event loop. It must see whichadded events are ready, and cause event_active to be called for eachactive event (usually via event_io_active or such). It shouldreturn 0 on success and -1 on error.等待事件(事件分发) */int (*dispatch)(struct event_base *, struct timeval *);/** Function to clean up and free our data from the event_base. 注销:释放I/O复用机制使用的资源*/void (*dealloc)(struct event_base *);/** Flag: set if we need to reinitialize the event base after we fork.*/int need_reinit;/** Bit-array of supported event_method_features that this backend can* provide. */enum event_method_feature features;/** Length of the extra information we should record for each fd thathas one or more active events. This information is recordedas part of the evmap entry for each fd, and passed as an argumentto the add and del functions above.*/size_t fdinfo_len;
};
事件主循环:
- 调用系统提供的I/O事件多路分发机制执行事件循环(事件监听函数 -> 等待事件)- 对已注册的事件,调用注册事件的回调函数来处理事件
int event_base_loop(struct event_base *base, int flags)源码分析:
int
event_base_loop(struct event_base *base, int flags)
{const struct eventop *evsel = base->evsel;struct timeval tv;struct timeval *tv_p;int res, done, retval = 0;/* Grab the lock. We will release it inside evsel.dispatch, and again* as we invoke user callbacks. */EVBASE_ACQUIRE_LOCK(base, th_base_lock);if (base->running_loop) {event_warnx("%s: reentrant invocation. Only one event_base_loop"" can run on each event_base at once.", __func__);EVBASE_RELEASE_LOCK(base, th_base_lock);return -1;}base->running_loop = 1;clear_time_cache(base);//设置信号事件的event_base实例if (base->sig.ev_signal_added && base->sig.ev_n_signals_added)evsig_set_base(base);/*voidevsig_set_base(struct event_base *base){EVSIGBASE_LOCK();evsig_base = base; //evsig_base :全局变量evsig_base_n_signals_added = base->sig.ev_n_signals_added;evsig_base_fd = base->sig.ev_signal_pair[0];EVSIGBASE_UNLOCK();}*/done = 0;#ifndef _EVENT_DISABLE_THREAD_SUPPORTbase->th_owner_id = EVTHREAD_GET_ID();
#endifbase->event_gotterm = base->event_break = 0;while (!done) {base->event_continue = 0;/* Terminate the loop if we have been asked to *///查看是否需要跳出循环,程序可以调用event_loopexit_cb()设置event_gotterm标记if (base->event_gotterm) {break;}//调用event_base_loopbreak()设置event_break标记 if (base->event_break) {break;}//校准系统时间。若系统使用的是非MONOTONIC时间,用户可能会向后调整了系统时间 // 在timeout_correct函数里,比较last wait time和当前时间,如果当前时间< last wait time // 表明时间有问题,这是需要更新timer_heap中所有定时事件的超时时间。 timeout_correct(base, &tv);tv_p = &tv;if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) {//根据timer heap(时间堆)中事件的最小超时时间(堆顶元素),计算系统I/O demultiplexer的最大等待时间(即I/O复用系统调用应设置的超时时间) timeout_next(base, &tv_p);} else {/** if we have active events, we just poll new events* without waiting.* 依然有未处理的就绪时间,就让I/O demultiplexer立即返回,不必等待 */evutil_timerclear(&tv);}/* If we have no events, we just exit *如果event_base中没有注册事件,就直接退出事件循环*/if (!event_haveevents(base) && !N_ACTIVE_CALLBACKS(base)) {event_debug(("%s: no events registered.", __func__));retval = 1;goto done;}/* update last old time 更新系统时间,清空时间缓存*/gettime(base, &base->event_tv);clear_time_cache(base);// 调用系统I/O demultiplexer等待就绪I/O events,可能是epoll_wait,或者select等; // 在evsel->dispatch()中,会把就绪signal event、I/O event插入到激活链表中 res = evsel->dispatch(base, tv_p);if (res == -1) {event_debug(("%s: dispatch returned unsuccessfully.",__func__));retval = -1;goto done;}// 将time cache赋值为当前系统时间 /*此处需要注意:采用时间缓存目的在于,不需要每次获取时间都执行系统调用(费时)*上边的gettime(base, &base->event_tv)和update_time_cache(base)这两次获取时间*都是ev_cache缓存的时间*/update_time_cache(base);// 检查heap中的timer events,将就绪的timer event从heap上删除,并插入到激活链表中timeout_process(base);if (N_ACTIVE_CALLBACKS(base)) {// 调用event_process_active()处理激活链表中的就绪的信号事件和I/O事件,调用其回调函数执行事件处理 // 该函数会寻找最高优先级(priority值越小优先级越高)的激活事件链表, // 然后处理链表中的所有就绪事件; // 因此低优先级的就绪事件可能得不到及时处理; int n = event_process_active(base);if ((flags & EVLOOP_ONCE)&& N_ACTIVE_CALLBACKS(base) == 0&& n != 0)done = 1;} else if (flags & EVLOOP_NONBLOCK)done = 1;}event_debug(("%s: asked to terminate loop.", __func__));done://事件循环结束,清空时间缓存clear_time_cache(base);base->running_loop = 0;EVBASE_RELEASE_LOCK(base, th_base_lock);return (retval);
}
流程如下图:
Libevent将Timer和Signal事件都统一到了系统的I/O 的demultiplex机制中:
int
evutil_socketpair(int family, int type, int protocol, evutil_socket_t fd[2]);//主要调用如下函数:
int
evutil_ersatz_socketpair(int family, int type, int protocol,evutil_socket_t fd[2])
{/* This code is originally from Tor. Used with permission. *//* This socketpair does not work when localhost is down. So* it's really not the same thing at all. But it's close enough* for now, and really, when localhost is down sometimes, we* have other problems too.*/
#ifdef WIN32
#define ERR(e) WSA##e
#else
#define ERR(e) e
#endifevutil_socket_t listener = -1;evutil_socket_t connector = -1;evutil_socket_t acceptor = -1;struct sockaddr_in listen_addr;struct sockaddr_in connect_addr;ev_socklen_t size;int saved_errno = -1;if (protocol|| (family != AF_INET
#ifdef AF_UNIX&& family != AF_UNIX
#endif)) {EVUTIL_SET_SOCKET_ERROR(ERR(EAFNOSUPPORT));return -1;}if (!fd) {EVUTIL_SET_SOCKET_ERROR(ERR(EINVAL));return -1;}listener = socket(AF_INET, type, 0);if (listener < 0)return -1;memset(&listen_addr, 0, sizeof(listen_addr));listen_addr.sin_family = AF_INET;listen_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);listen_addr.sin_port = 0; /* kernel chooses port. */if (bind(listener, (struct sockaddr *) &listen_addr, sizeof (listen_addr))== -1)goto tidy_up_and_fail;if (listen(listener, 1) == -1)goto tidy_up_and_fail;connector = socket(AF_INET, type, 0);if (connector < 0)goto tidy_up_and_fail;/* We want to find out the port number to connect to. */size = sizeof(connect_addr);if (getsockname(listener, (struct sockaddr *) &connect_addr, &size) == -1)goto tidy_up_and_fail;if (size != sizeof (connect_addr))goto abort_tidy_up_and_fail;if (connect(connector, (struct sockaddr *) &connect_addr,sizeof(connect_addr)) == -1)goto tidy_up_and_fail;size = sizeof(listen_addr);acceptor = accept(listener, (struct sockaddr *) &listen_addr, &size);if (acceptor < 0)goto tidy_up_and_fail;if (size != sizeof(listen_addr))goto abort_tidy_up_and_fail;/* Now check we are talking to ourself by matching port and host on thetwo sockets. */if (getsockname(connector, (struct sockaddr *) &connect_addr, &size) == -1)goto tidy_up_and_fail;if (size != sizeof (connect_addr)|| listen_addr.sin_family != connect_addr.sin_family|| listen_addr.sin_addr.s_addr != connect_addr.sin_addr.s_addr|| listen_addr.sin_port != connect_addr.sin_port)goto abort_tidy_up_and_fail;evutil_closesocket(listener);fd[0] = connector;fd[1] = acceptor;return 0;abort_tidy_up_and_fail:saved_errno = ERR(ECONNABORTED);tidy_up_and_fail:if (saved_errno < 0)saved_errno = EVUTIL_SOCKET_ERROR();if (listener != -1)evutil_closesocket(listener);if (connector != -1)evutil_closesocket(connector);if (acceptor != -1)evutil_closesocket(acceptor);EVUTIL_SET_SOCKET_ERROR(saved_errno);return -1;
#undef ERR
}
大体流程为:
2. 将socket pair的读socket在libevent的event_base实例上注册一个persist的读事件
int
event_assign(struct event *ev, struct event_base *base, evutil_socket_t fd, short events, void (*callback)(evutil_socket_t, short, void *), void *arg)
注:libevent没有同步保护机制
消息通知机制 – 消息通知+同步层:
例:
上述中的工作队列即一加锁的容器(队列、链表等),boss发来的消息:消息通知,仅需一字节,降低了通信开销
本文发布于:2024-02-01 08:16:57,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170674661735164.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |