libev 事件库

2016-12-30 Friday     linux , program

libev 是一个 C 语言编写的,高性能的事件循环库,与此类似的事件循环库还有 libevent、libubox 等,在此详细介绍下 libev 相关的内容。

简介

关于 libev 详见官网 http://software.schmorp.de,其帮助文档可以参考 官方文档,安装完之后,可通过 man 3 ev 查看帮助信息,文档也在源码中保存了一份,可以通过 man -l ev.3 命令查看。

安装

安装可以源码安装,或者在 CentOS 中,最简单可通过如下方式安装。

----- 安装库
# yum install libev libev-devel

示例程序

如下是一个简单的示例程序。

#include <ev.h>       // a single header file is required
#include <stdio.h>    // for puts

// every watcher type has its own typedef'd struct with the name ev_TYPE
ev_io stdin_watcher;
ev_timer timeout_watcher;

// all watcher callbacks have a similar signature
// this callback is called when data is readable on stdin
static void stdin_cb (EV_P_ ev_io *w, int revents)
{
    puts ("stdin ready");
    // for one-shot events, one must manually stop the watcher
    // with its corresponding stop function.
    ev_io_stop (EV_A_ w);

    // this causes all nested ev_run's to stop iterating
    ev_break (EV_A_ EVBREAK_ALL);
}

// another callback, this time for a time-out
static void timeout_cb (EV_P_ ev_timer *w, int revents)
{
    puts ("timeout");
    // this causes the innermost ev_run to stop iterating
    ev_break (EV_A_ EVBREAK_ONE);
}

int main (void)
{
    // use the default event loop unless you have special needs
    struct ev_loop *loop = EV_DEFAULT; /* OR ev_default_loop(0) */

    // initialise an io watcher, then start it
    // this one will watch for stdin to become readable
    ev_io_init (&stdin_watcher, stdin_cb, /*STDIN_FILENO*/ 0, EV_READ);
    ev_io_start (EV_A_ &stdin_watcher);

    // initialise a timer watcher, then start it
    // simple non-repeating 5.5 second timeout
    ev_timer_init (&timeout_watcher, timeout_cb, 5.5, 0.);
    ev_timer_start (EV_A_ &timeout_watcher);

    // now wait for events to arrive
    ev_run (loop, 0);

    // break was called, so exit
    return 0;
}

可以通过如下命令编译。

----- 编译示例程序
$ gcc -lev example.c -o example

执行过程

libev 是一个事件循环,首先需要注册感兴趣的事件,libev 会监控这些事件,当事件发生时调用相应的处理函数,也就是回调函数。其处理过程为:

  1. 初始化一个事件循环
    可以通过 ev_default_loop(0) 或者 EV_DEFAULT 初始化,两者等价。
  2. 定义数据
    在 ev.h 中定义了各种类型的,如 ev_io、ev_timer、ev_signal等。
  3. 注册感兴趣的事件
    在此被称为 watchers ,这个是 C 结构体。
  4. 启动监控
    启动上步注册的事件,如 ev_io_start()、ev_timer_start() 等。
  5. 启动 libev
    重复 1,2 步,然后启动 libev 事件循环,直接执行 ev_run() 即可。

源码详解

libev 通过观察器 (watcher) 来监听各种事件,watcher 包括了事件类型、优先级、触发条件和回调函数等参数;将其注册到事件循环上,在满足注册的条件时,会触发观察器,调用它的回调函数。

enum {
  EV_UNDEF    = (int)0xFFFFFFFF, /* guaranteed to be invalid */
  EV_NONE     =            0x00, /* no events */
  EV_READ     =            0x01, /* ev_io detected read will not block */
  EV_WRITE    =            0x02, /* ev_io detected write will not block */
  EV__IOFDSET =            0x80, /* internal use only */
  EV_IO       =         EV_READ, /* alias for type-detection */
  EV_TIMER    =      0x00000100, /* timer timed out */
#if EV_COMPAT3
  EV_TIMEOUT  =        EV_TIMER, /* pre 4.0 API compatibility */
#endif
  EV_PERIODIC =      0x00000200, /* periodic timer timed out */
  EV_SIGNAL   =      0x00000400, /* signal was received */
  EV_CHILD    =      0x00000800, /* child/pid had status change */
  EV_STAT     =      0x00001000, /* stat data changed */
  EV_IDLE     =      0x00002000, /* event loop is idling */
  EV_PREPARE  =      0x00004000, /* event loop about to poll */
  EV_CHECK    =      0x00008000, /* event loop finished poll */
  EV_EMBED    =      0x00010000, /* embedded event loop needs sweep */
  EV_FORK     =      0x00020000, /* event loop resumed in child */
  EV_CLEANUP  =      0x00040000, /* event loop resumed in child */
  EV_ASYNC    =      0x00080000, /* async intra-loop signal */
  EV_CUSTOM   =      0x01000000, /* for use by user code */
  EV_ERROR    = (int)0x80000000  /* sent when an error occurs */

libev 中的观察器分为 4 种状态:初始化、启动/活动、等待、停止。

首先需要对 watcher 初始化,可通过 ev_TYPE_init() 或者 ev_init()+ev_TYPE_set() 初始化,两者等效;实际就是设置对应结构体的初始值。

#define ev_io_init(ev,cb,fd,events)              \
    do { ev_init ((ev), (cb)); ev_io_set ((ev),(fd),(events)); } while (0)
#define ev_timer_init(ev,cb,after,repeat)        \
    do { ev_init ((ev), (cb)); ev_timer_set ((ev),(after),(repeat)); } while (0)
#define ev_periodic_init(ev,cb,ofs,ival,rcb)     \
    do { ev_init ((ev), (cb)); ev_periodic_set ((ev),(ofs),(ival),(rcb)); } while (0)
#define ev_signal_init(ev,cb,signum)             \
    do { ev_init ((ev), (cb)); ev_signal_set ((ev), (signum)); } while (0)
#define ev_child_init(ev,cb,pid,trace)           \
    do { ev_init ((ev), (cb)); ev_child_set ((ev),(pid),(trace)); } while (0)
#define ev_stat_init(ev,cb,path,interval)        \
    do { ev_init ((ev), (cb)); ev_stat_set ((ev),(path),(interval)); } while (0)
#define ev_idle_init(ev,cb)                      \
    do { ev_init ((ev), (cb)); ev_idle_set ((ev)); } while (0)
#define ev_prepare_init(ev,cb)                   \
    do { ev_init ((ev), (cb)); ev_prepare_set ((ev)); } while (0)
#define ev_check_init(ev,cb)                     \
    do { ev_init ((ev), (cb)); ev_check_set ((ev)); } while (0)
#define ev_embed_init(ev,cb,other)               \
    do { ev_init ((ev), (cb)); ev_embed_set ((ev),(other)); } while (0)
#define ev_fork_init(ev,cb)                      \
    do { ev_init ((ev), (cb)); ev_fork_set ((ev)); } while (0)
#define ev_cleanup_init(ev,cb)                   \
    do { ev_init ((ev), (cb)); ev_cleanup_set ((ev)); } while (0)
#define ev_async_init(ev,cb)                     \
    do { ev_init ((ev), (cb)); ev_async_set ((ev)); } while (0)

接下来,通过 ev_TYPE_start()ev_TYPE_stop() 来启动、停止观察器,停止同时会释放内存。

结构体

libev 通过 C 语言实现,其中通过宏实现了一种类似的继承机制,也就是其中各种 Watchers 的部分成员变量是相同的,只有少部分成员为各自独有,接下来简单介绍下。

每个 watcher 都会包含 EV_WATCHER 宏定义的内容,该宏实际会包含如下内容,其中 type 对应类型,如 ev_io 等。

# define EV_CB_DECLARE(type) void (*cb)(EV_P_ struct type *w, int revents);

#define EV_WATCHER(type) \
  int active;          /* private,是否激活,通过start()/stop()处理 */ \
  int pending;         /* private,有事件就绪等待处理,对应了等待队列的下标 */ \
  EV_DECL_PRIORITY     /* private,定义优先级,如果没有使用优先级则是空 */ \
  EV_COMMON            /* rw,私有数据,一般是void *data */ \
  EV_CB_DECLARE (type) /* private,回调函数 */

#define EV_WATCHER_LIST(type)           \
  EV_WATCHER (type)                     \
  struct ev_watcher_list *next; /* private */

#define EV_WATCHER_TIME(type)           \
  EV_WATCHER (type)                     \
  ev_tstamp at;                 /* private */

typedef struct ev_watcher {
  EV_WATCHER (ev_watcher)
} ev_watcher;

typedef struct ev_watcher_list {
  EV_WATCHER_LIST (ev_watcher_list)
} ev_watcher_list;

typedef struct ev_io {
  EV_WATCHER_LIST (ev_io)

  int fd;     /* ro */
  int events; /* ro */
} ev_io;

typedef struct ev_timer {
  EV_WATCHER_TIME (ev_timer)

  ev_tstamp repeat; /* rw */
} ev_timer;

如上的 ev_watcher 结构体可以时为 “基类”,通过宏 EV_WATCHER 定义了它的所有成员;而像 IO Watcher、Signal Watcher 是以链表的形式进行组织的,所以在 ev_watcher 基类的基础上,定义了 ev_watcher 的子类 ev_watcher_list 。

多实例支持

ev_loop 是主循环,保存了与循环相关的很多变量,而 EV_MULTIPLICITY 是一个条件编译的宏,表明是否支持有多个 ev_loop 实例存在,表现在源码中表示是否需要传递 struct ev_loop *loop 参数,一般来说,每个线程中有且仅有一个 ev_loop 实例。

系统时间

先看下 libev 是如何使用时间的,因为该库中很多与时间相关的操作;在 libev.m4 中,定义了与之相关的宏,如下所示。

AC_CHECK_FUNCS(clock_gettime, [], [
   dnl on linux, try syscall wrapper first
   if test $(uname) = Linux; then
      AC_MSG_CHECKING(for clock_gettime syscall)
      AC_LINK_IFELSE([AC_LANG_PROGRAM(
         [#include <unistd.h>
          #include <sys/syscall.h>
          #include <time.h>],
         [struct timespec ts; int status = syscall (SYS_clock_gettime, CLOCK_REALTIME, &ts)])],
         [ac_have_clock_syscall=1
          AC_DEFINE(HAVE_CLOCK_SYSCALL, 1, Define to 1 to use the syscall interface for clock_gettime)
          AC_MSG_RESULT(yes)],
         [AC_MSG_RESULT(no)])
   fi
   if test -z "$LIBEV_M4_AVOID_LIBRT" && test -z "$ac_have_clock_syscall"; then
      AC_CHECK_LIB(rt, clock_gettime)
      unset ac_cv_func_clock_gettime
      AC_CHECK_FUNCS(clock_gettime)
   fi
])

AC_CHECK_FUNCS(nanosleep, [], [
   if test -z "$LIBEV_M4_AVOID_LIBRT"; then
      AC_CHECK_LIB(rt, nanosleep)
      unset ac_cv_func_nanosleep
      AC_CHECK_FUNCS(nanosleep)
   fi
])

clock_gettime() 函数的调用有两种方式,分别是系统调用和 -lrt 库;在上述的 libev.m4 中,会进行检测,首先会检测 clock_gettime() 系统调用是否可用,如果可用会定义 HAVE_CLOCK_SYSCALL 宏。

libev 提供了单调递增 (monotonic) 以及实时时间 (realtime) 两种记时方式,其宏定义的方式如下,而 HAVE_CLOCK_SYSCALLHAVE_CLOCK_GETTIME 的详见 libev.m4 中定义,优先使用 SYS_clock_gettime() 系统调用 API 函数。

# if HAVE_CLOCK_SYSCALL
#  ifndef EV_USE_CLOCK_SYSCALL
#   define EV_USE_CLOCK_SYSCALL 1
#   ifndef EV_USE_REALTIME
#    define EV_USE_REALTIME  0
#   endif
#   ifndef EV_USE_MONOTONIC
#    define EV_USE_MONOTONIC 1
#   endif
#  endif
# elif !defined EV_USE_CLOCK_SYSCALL
#  define EV_USE_CLOCK_SYSCALL 0
# endif

# if HAVE_CLOCK_GETTIME
#  ifndef EV_USE_MONOTONIC
#   define EV_USE_MONOTONIC 1
#  endif
#  ifndef EV_USE_REALTIME
#   define EV_USE_REALTIME  0
#  endif
# else
#  ifndef EV_USE_MONOTONIC
#   define EV_USE_MONOTONIC 0
#  endif
#  ifndef EV_USE_REALTIME
#   define EV_USE_REALTIME  0
#  endif
# endif

优先使用系统调用和单调递增时间,在 CentOS 7 中通常定义为。

#define HAVE_CLOCK_GETTIME 1
#define EV_USE_REALTIME  0
#define EV_USE_MONOTONIC 1

在如下的初始化函数中介绍详细的细节。

初始化

无论是通过 EV_DEFAULT 宏还是 ev_default_loop() 函数进行初始化,实际上功能都相同,也就是都调用了 ev_default_loop(0) 进行初始化,主要流程为 ev_default_loop()->loop_init()

如下主要介绍 loop_init() 函数。

#ifndef EV_HAVE_EV_TIME
ev_tstamp
ev_time (void) EV_THROW
{
#if EV_USE_REALTIME
  if (expect_true (have_realtime))
    {
      struct timespec ts;
      clock_gettime (CLOCK_REALTIME, &ts);
      return ts.tv_sec + ts.tv_nsec * 1e-9;
    }
#endif

  struct timeval tv;
  gettimeofday (&tv, 0);
  return tv.tv_sec + tv.tv_usec * 1e-6;
}
#endif

inline_size ev_tstamp
get_clock (void)
{
#if EV_USE_MONOTONIC
  if (expect_true (have_monotonic))
    {
      struct timespec ts;
      clock_gettime (CLOCK_MONOTONIC, &ts);
      return ts.tv_sec + ts.tv_nsec * 1e-9;
    }
#endif

  return ev_time ();
}

void noinline ecb_cold loop_init (EV_P_ unsigned int flags) EV_THROW
{
  if (!backend) {  // 如果backend还没有确定
      origflags = flags;

#if EV_USE_REALTIME
      if (!have_realtime)
        {
          struct timespec ts;

          if (!clock_gettime (CLOCK_REALTIME, &ts))
            have_realtime = 1;
        }
#endif

#if EV_USE_MONOTONIC
      if (!have_monotonic)
        {
          struct timespec ts;

          if (!clock_gettime (CLOCK_MONOTONIC, &ts))
            have_monotonic = 1;
        }
#endif

      /* pid check not overridable via env */
#ifndef _WIN32
      if (flags & EVFLAG_FORKCHECK)
        curpid = getpid ();
#endif

      if (!(flags & EVFLAG_NOENV)
          && !enable_secure ()
          && getenv ("LIBEV_FLAGS"))
        flags = atoi (getenv ("LIBEV_FLAGS"));

      ev_rt_now          = ev_time ();
      mn_now             = get_clock ();
      now_floor          = mn_now;
      rtmn_diff          = ev_rt_now - mn_now;
#if EV_FEATURE_API
      invoke_cb          = ev_invoke_pending;
#endif

      io_blocktime       = 0.;
      timeout_blocktime  = 0.;
      backend            = 0;
      backend_fd         = -1;
      sig_pending        = 0;
#if EV_ASYNC_ENABLE
      async_pending      = 0;
#endif
      pipe_write_skipped = 0;
      pipe_write_wanted  = 0;
      evpipe [0]         = -1;
      evpipe [1]         = -1;
#if EV_USE_INOTIFY
      fs_fd              = flags & EVFLAG_NOINOTIFY ? -1 : -2;
#endif
#if EV_USE_SIGNALFD
      sigfd              = flags & EVFLAG_SIGNALFD  ? -2 : -1;
#endif

      if (!(flags & EVBACKEND_MASK))
        flags |= ev_recommended_backends ();

#if EV_USE_IOCP
      if (!backend && (flags & EVBACKEND_IOCP  )) backend = iocp_init   (EV_A_ flags);
#endif
#if EV_USE_PORT
      if (!backend && (flags & EVBACKEND_PORT  )) backend = port_init   (EV_A_ flags);
#endif
#if EV_USE_KQUEUE
      if (!backend && (flags & EVBACKEND_KQUEUE)) backend = kqueue_init (EV_A_ flags);
#endif
#if EV_USE_EPOLL
      if (!backend && (flags & EVBACKEND_EPOLL )) backend = epoll_init  (EV_A_ flags);
#endif
#if EV_USE_POLL
      if (!backend && (flags & EVBACKEND_POLL  )) backend = poll_init   (EV_A_ flags);
#endif
#if EV_USE_SELECT
      if (!backend && (flags & EVBACKEND_SELECT)) backend = select_init (EV_A_ flags);
#endif

      ev_prepare_init (&pending_w, pendingcb);

#if EV_SIGNAL_ENABLE || EV_ASYNC_ENABLE
      ev_init (&pipe_w, pipecb);
      ev_set_priority (&pipe_w, EV_MAXPRI);
#endif
    }
}

其中有两个比较重要的时间变量,也就是 ev_rt_nowmn_now,前者表示当前的日历时间,也就是自 1970.01.01 以来的秒数,该值通过 gettimeofday() 得到。

调用流程

在介绍各个 Watcher 的流程之前,首先看下主循环的执行过程。

该函数通常是在各个事件初始化完成之后调用,也就是等待操作系统的事件,然后调用已经注册的回调函数,并一直重复循环执行。

int ev_run (EV_P_ int flags)
{
  ++loop_depth;      // 如果定义了EV_FEATURE_API宏
  loop_done = EVBREAK_CANCEL;
  EV_INVOKE_PENDING; // 在执行前确认所有的事件已经执行

  do {
      ev_verify (EV_A);  // 当EV_VERIFY >= 2时,用于校验当前的结构体是否正常
      if (expect_false (curpid)) /* penalise the forking check even more */
        if (expect_false (getpid () != curpid)) {
            curpid = getpid ();
            postfork = 1;
        }

      /* we might have forked, so queue fork handlers */
      if (expect_false (postfork))
        if (forkcnt) {
            queue_events (EV_A_ (W *)forks, forkcnt, EV_FORK);
            EV_INVOKE_PENDING;
        }

      /* queue prepare watchers (and execute them) */
      if (expect_false (preparecnt)) {
          queue_events (EV_A_ (W *)prepares, preparecnt, EV_PREPARE);
          EV_INVOKE_PENDING;
      }

      if (expect_false (loop_done))
        break;

      /* we might have forked, so reify kernel state if necessary */
      if (expect_false (postfork))
        loop_fork (EV_A);

      /* update fd-related kernel structures */
      fd_reify (EV_A);

      /* calculate blocking time */
      {
        ev_tstamp waittime  = 0.;
        ev_tstamp sleeptime = 0.;

        /* remember old timestamp for io_blocktime calculation */
        ev_tstamp prev_mn_now = mn_now;

        /* 会更新当前时间mn_now和ev_rt_now,如果发现时间被调整,则调用
         * timers_reschedule()函数调整堆loop->timers()中的每个节点。
         */
        time_update (EV_A_ 1e100);

        /* from now on, we want a pipe-wake-up */
        pipe_write_wanted = 1;

        ECB_MEMORY_FENCE; /* make sure pipe_write_wanted is visible before we check for potential skips */

        if (expect_true (!(flags & EVRUN_NOWAIT || idleall || !activecnt || pipe_write_skipped))) {
            waittime = MAX_BLOCKTIME;

            if (timercnt) {    // 如果有定时器存在则重新计算等待时间
                ev_tstamp to = ANHE_at (timers [HEAP0]) - mn_now;
                if (waittime > to) waittime = to;
            }
            if (periodiccnt) { // 如果定义了EV_PERIODIC_ENABLE宏
                ev_tstamp to = ANHE_at (periodics [HEAP0]) - ev_rt_now;
                if (waittime > to) waittime = to;
            }

            /* don't let timeouts decrease the waittime below timeout_blocktime */
            if (expect_false (waittime < timeout_blocktime)) // 默认timeout_blocktime为0
              waittime = timeout_blocktime;

            /* at this point, we NEED to wait, so we have to ensure */
            /* to pass a minimum nonzero value to the backend */
            if (expect_false (waittime < backend_mintime))
              waittime = backend_mintime;  // 不同的后端最小等待时间不同

            /* extra check because io_blocktime is commonly 0 */
            if (expect_false (io_blocktime)) {
                sleeptime = io_blocktime - (mn_now - prev_mn_now);

                if (sleeptime > waittime - backend_mintime)
                  sleeptime = waittime - backend_mintime;

                if (expect_true (sleeptime > 0.)) {
                    ev_sleep (sleeptime);
                    waittime -= sleeptime;
                }
            }
        }

#if EV_FEATURE_API
        ++loop_count;
#endif
        /* 调用IO复用函数,例如epoll_poll(),在此需要保证阻塞时间小于loop->timers,
         * 以及loop->periodics的栈顶元素的触发时间。
         */
        assert ((loop_done = EVBREAK_RECURSE, 1)); /* assert for side effect */
        backend_poll (EV_A_ waittime);
        assert ((loop_done = EVBREAK_CANCEL, 1)); /* assert for side effect */

        pipe_write_wanted = 0; /* just an optimisation, no fence needed */

        ECB_MEMORY_FENCE_ACQUIRE;
        if (pipe_write_skipped) {
            assert (("libev: pipe_w not active, but pipe not written", ev_is_active (&pipe_w)));
            ev_feed_event (EV_A_ &pipe_w, EV_CUSTOM);
        }

        /* update ev_rt_now, do magic */
        time_update (EV_A_ waittime + sleeptime); // 更新时间,防止timejump
      }

      /* 如果栈顶元素的超时时间已经超过了当前时间,则将栈顶元素的监控器添加到
       * loop->pendings中,并调整堆结构,接着判断栈顶元素是否仍超时,一致重复,
       * 直到栈顶元素不再超时。
       */
      timers_reify (EV_A); /* relative timers called last */
      periodics_reify (EV_A); /* absolute timers called first */

      /* queue idle watchers unless other events are pending */
      idle_reify (EV_A);

      /* queue check watchers, to be executed first */
      if (expect_false (checkcnt))
        queue_events (EV_A_ (W *)checks, checkcnt, EV_CHECK);

      /* 按照优先级,顺序遍厉loop->pendings数组,调用其中每个监视器的回调函数 */
      EV_INVOKE_PENDING;
    } while (expect_true (
        activecnt
        && !loop_done
        && !(flags & (EVRUN_ONCE | EVRUN_NOWAIT))
    ));

  if (loop_done == EVBREAK_ONE)
    loop_done = EVBREAK_CANCEL;

#if EV_FEATURE_API
  --loop_depth;
#endif

  return activecnt;
}

IO Watcher

对 IO 事件的监控的函数,会在 loop_init() 中初始化 backend_poll 变量,正是通过该函数监控 io 事件,如下是一个简单的示例。

void cb (struct ev_loop *loop, ev_io *w, int revents)
{
    ev_io_stop (loop, w);
    // .. read from stdin here (or from w->fd) and handle any I/O errors
}
ev_io watcher;
ev_io_init (&watcher, cb, STDIN_FILENO, EV_READ);  // 初始化,第三个是文件描述符,第四个是监听事件
ev_io_start (loop, &watcher);

其中,ev_io_init() 用来设置结构体的参数,除了初始化通用的变量之外,还包括 io 观察器对应的 fd 和 event 。

ev_io_start()

作用是设置 ANFD anfds[],其中文件描述符为其序号,并将相应的 IO Watcher 插入到对应 fd 的链表中。由于对应 fd 的监控条件已有改动了,同时会在 int fdchanges[] 中记录下该 fd ,并在后续的步骤中调用系统的接口修改对该 fd 监控条件。

void noinline ev_io_start (EV_P_ ev_io *w) EV_THROW
{
  int fd = w->fd;
  if (expect_false (ev_is_active (w))) // 如果已经启动则直接退出
    return;
  EV_FREQUENT_CHECK;                   // 通过ev_verify()校验数据格式是否正常

  ev_start (EV_A_ (W)w, 1);            // 设置watch->active变量
  array_needsize (ANFD, anfds, anfdmax, fd + 1, array_init_zero);
  wlist_add (&anfds[fd].head, (WL)w);

  // 添加到fdchanges[]数组中
  fd_change (EV_A_ fd, w->events & EV__IOFDSET | EV_ANFD_REIFY);
  w->events &= ~EV__IOFDSET;

  EV_FREQUENT_CHECK;                   // 如上,通过ev_verify()校验数据格式是否正常
}

libev io watcher

调用 ev_run() 开始等待事件的触发,该函数中首先会调用 fd_reify(),该函数根据 fdchanges[] 中记录的描述符,将该描述符上的事件添加到 backend 所使用的数据结构中;调用 time_update() 更新当前时间。

接着计算超时时间,并调用 backend_poll() 开始等待事件的发生,如果事件在规定时间内触发的话,则会调用 fd_event() 将触发的监视器记录到 pendings 中;

backend 监听函数 (如 select()、poll()、epoll_wait()等) 返回后,再次调用 time_update() 更新时间,然后调用 ev_invoke_pending() ,依次处理 pendings 中的监视器,调用该监视器的回调函数。

fd_reify()

该函数在 ev_run() 的每轮循环中都会调用;会将 fdchanges 中记录的这些新事件一个个的处理,并调用后端 IO 复用的 backend_modify 宏。

多路复用

当前支持的多路复用通过如下方式定义,

/* method bits to be ored together */
enum {
  EVBACKEND_SELECT  = 0x00000001U, /* about anywhere */
  EVBACKEND_POLL    = 0x00000002U, /* !win */
  EVBACKEND_EPOLL   = 0x00000004U, /* linux */
  EVBACKEND_KQUEUE  = 0x00000008U, /* bsd */
  EVBACKEND_DEVPOLL = 0x00000010U, /* solaris 8 */ /* NYI */
  EVBACKEND_PORT    = 0x00000020U, /* solaris 10 */
  EVBACKEND_ALL     = 0x0000003FU, /* all known backends */
  EVBACKEND_MASK    = 0x0000FFFFU  /* all future backends */
};

而在通过 configure 进行编译时,会对宏进行处理,以 epoll 为例,可以查看 ev.c 中的内容;在通过 configure 编译时,如果支持 EPOLL 会在 config.h 中生成 HAVE_POLLHAVE_POLL_H 宏定义。

# if HAVE_POLL && HAVE_POLL_H
#  ifndef EV_USE_POLL
#   define EV_USE_POLL EV_FEATURE_BACKENDS
#  endif
# else
#  undef EV_USE_POLL
#  define EV_USE_POLL 0
# endif

之后调用 ev_recommended_backends() 得到当前系统支持的 backend 类型,比如 select、poll、epoll 等;然后,接下来就是根据系统支持的 backend,按照一定的优先顺序,去初始化 backend 。

Signal Watcher

在收到 SIGINT 时做些清理,直接退出。

static void sigint_cb (struct ev_loop *loop, ev_signal *w, int revents)
{
    ev_break (EV_A_ EVBREAK_ALL);
}
ev_signal signal_watcher;
ev_signal_init (&signal_watcher, sigint_cb, SIGINT);
ev_signal_start (loop, &signal_watcher);
/* catch a SIGINT signal, ctrl-c */
#include <ev.h>       // a single header file is required
#include <stdio.h>    // for puts

ev_signal signal_watcher;
static void sigint_cb (struct ev_loop *loop, ev_signal *w, int revents)
{
    puts("catch SIGINT");
    ev_break (EV_A_ EVBREAK_ALL);
}

int main (void)
{
    // use the default event loop unless you have special needs
    struct ev_loop *loop = EV_DEFAULT; /* OR ev_default_loop(0) */

    ev_signal_init (&signal_watcher, sigint_cb, SIGINT);
    ev_signal_start (loop, &signal_watcher);

    // now wait for events to arrive
    ev_run (loop, 0);

    // break was called, so exit
    return 0;
}

Fork Watcher

在 libev 中提供了一个 fork 事件的监控,libev 会在循环中自动检测是否调用了 fork() 函数,如果是那么会重新设置事件驱动回调函数。

除了自动判断,也可以在 fork() 子进程之后调用 ev_loop_fork() 函数。

#include "ev.h"
#include <stdio.h>

static void fork_callback(EV_P_ ev_fork *w, int revents)
{
        (void) w;
        (void) revents;

        printf("[%d] fork callback\n", getpid());
}

static void timeout_callback(EV_P_ ev_timer *w,int revents)
{
        (void) w;
        (void) revents;

        printf("[%d] time out\n", getpid());
        //ev_break(EV_A_ EVBREAK_ALL);
}

int main(void)
{
        EV_P EV_DEFAULT;
        ev_fork wfork;
        ev_timer wtimer;

        ev_fork_init(&wfork, fork_callback);
        ev_fork_start(EV_A_ &wfork);

        ev_timer_init(&wtimer, timeout_callback, 1., 1.);
        ev_timer_start(EV_A_ &wtimer);

        pid_t pid;

        pid = fork();
        if (pid < 0) {
                return -1;
        } else if (pid == 0) {
                printf("[%d] Child\n", getpid());
                //ev_loop_fork(EV_A_);
                ev_run(EV_A_ 0);
                ev_loop_destroy(EV_A_);
                return 0;
        }

        printf("[%d] Parent\n", getpid());

        ev_run(EV_A_ 0);
        ev_loop_destroy(EV_A_);

        return 0;
}

在如上的示例中,会在子进程中重新执行,所以最好的方式是,如果不需要最好直接关闭。

另外,在创建 epoll 对象时,入参使用了 EPOLL_CLOEXEC 参数,也就意味着在 fork 进程时会自动关闭文件描述符。

Child Watcher

fork 一个新进程,给它安装一个 child 处理器等待进程结束,实际上会等待接受 SIGCHLD 信号,然后调用相应的事件。

ev_child cw;
static void child_cb (EV_P_ ev_child *w, int revents)
{
	ev_child_stop (EV_A_ w);
	printf ("process %d exited with status %x\n", w->rpid, w->rstatus);
}
pid_t pid = fork ();
if (pid < 0) {            // error
	perror("fork()");
	exit(EXIT_FAILURE);
} else if (pid == 0) {    // child
	// the forked child executes here
	sleep(1);
	exit (EXIT_SUCCESS);
} else {                  // parent
	ev_child_init (&cw, child_cb, pid, 0);
	ev_child_start (EV_DEFAULT_ &cw);
}

实际上,是通过注册一个 SIGCHILD 信号进行处理的,其回调函数是 childcb

Filestat Watcher

监控 Makefile 是否有变化,可以通过修改文件触发事件。

static void filestat_cb (struct ev_loop *loop, ev_stat *w, int revents)
{
    // "Makefile" changed in some way
    if (w->attr.st_nlink) {
        printf ("Makefile current size  %ld\n", (long)w->attr.st_size);
        printf ("Makefile current atime %ld\n", (long)w->attr.st_mtime);
        printf ("Makefile current mtime %ld\n", (long)w->attr.st_mtime);
    } else { /* you shalt not abuse printf for puts */
        puts ("wow, Makefile is not there, expect problems. "
              "if this is windows, they already arrived\n");
    }
}
ev_stat makefile;
ev_stat_init (&makefile, filestat_cb, "Makefile", 0.);
ev_stat_start (loop, &makefile);

其它

也就是 ev_prepareev_checkev_idle,这三个类型的实际上是事件循环的扩展。

  • ev_prepare 在事件循环发生阻塞前会被触发。

信号处理

Linux 中的信号时异步发生的,一般是从内核态切换到用户态时进行检查,从而从用户代码角度看,就是异步处理。

采用的是将异步信号同步化处理,同步化方案有,signalfdeventfdpipesigwaitinfo 等,这里采用的是前三种,将对异步信号的处理,转化成对文件描述符的处理,也就是将 ev_signal 转化为处理 ev_io ;而最后一种,需要单独起一个信号处理线程。

源码解析

使用示例如下。

#include <stdio.h>
#include <libev/ev.h>

static void sigint_cb (EV_P_ ev_signal *w, int revents)
{
        puts("catch SIGINT");
        ev_break (EV_A_ EVBREAK_ALL);
}

int main (void)
{
        EV_P EV_DEFAULT;
        static ev_signal signal_watcher;

        ev_signal_init (&signal_watcher, sigint_cb, SIGINT);
        ev_signal_start(EV_A_ &signal_watcher);

        ev_loop(EV_A_ 0);

        return 0;
}

数据结构

对应的结构体展开后的成员对象如下:

typedef struct ev_signal {  
	int active;
	int pending;
	int priority;
	void *data;
	void (*cb)(EV_P_ struct ev_signal *w, int revents);
	struct ev_watcher_list *next;
	int signum;
} ev_signal;  

包括 cb 在内之前的都是比较标准的成员,其中 signum 记录了信号量,成员结构体通过 list 链接。另外,在 ev.c 内部,通过 ANSIG 结构体维护了一个数组结构,用来组织 ev_signal 结构体。

typedef struct {
    sig_atomic_t volatile pending;   // 信号处于未决状态,也就是触发但尚未处理
#if EV_MULTIPLICITY
    struct ev_loop *loop;
#endif
    ev_watcher_list *head;           // 该信号所注册的信号处理回调函数
} ANSIG;
static ANSIG signals [EV_NSIG - 1];

signals 是 ANSIG 类型的数组,它的下标就是相应的信号值 - 1,也就是说,每个信号都有对应的 ANSIG 结构。

信号同步处理

在 Linux 平台上,libev 信号同步机制采用的顺序为:signalfd、eventfd、pipe 。

signalfd

signalfd 是最简单方便的信号同步机制,可以很容易的将异步的信号的监听转化成对文件描述符的监听。

下面首先看一下使用 signalfd 时的信号处理流程,其函数声明为。

#include <sys/signalfd.h>
int signalfd(int fd, const sigset_t*mask, intflags);

参数:
    fd: -1 生成新文件描述符;或者指定存在有效的 fd ,而 mask 会替换掉之前相关联的信号集。
    mask: 这个文件描述符接受的信号集,可以通过sigsetops()宏初始化。

函数使用示例如下:

#include <stdio.h>
#include <signal.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/signalfd.h>

#define handle_error(msg) do {           \
        perror(msg); exit(EXIT_FAILURE); \
} while (0)

int main(void)
{
        int sfd;
        ssize_t rc;
        struct signalfd_siginfo fdsi;

        sigset_t mask;
        sigemptyset(&mask);
        sigaddset(&mask, SIGINT);
        sigaddset(&mask, SIGQUIT);

        if (sigprocmask(SIG_BLOCK, &mask, NULL) == -1)
                handle_error("sigprocmask");

        if ((sfd = signalfd(-1, &mask, 0)) == -1)
                handle_error("signalfd");

        while(1) {
                rc = read(sfd, &fdsi, sizeof(struct signalfd_siginfo));
                if (rc != sizeof(struct signalfd_siginfo))
                        handle_error("read");

                if (fdsi.ssi_signo == SIGINT) {
                        printf("Got SIGINT\n");
                } else if (fdsi.ssi_signo == SIGQUIT) {
                        printf("Got SIGQUIT\n");
                        exit(EXIT_SUCCESS);
                } else {
                        printf("Read unexpected signal\n");
                }
        }

        return 0;
}

杂项

代码优化

libev 可以通过很多宏进行调优,默认会通过 EV_FEATURES 宏定义一些特性,定义如下。

#ifndef EV_FEATURES
# if defined __OPTIMIZE_SIZE__
#  define EV_FEATURES 0x7c  /* 0111 1100 */
# else
#  define EV_FEATURES 0x7f  /* 0111 1111 */
# endif
#endif

#define EV_FEATURE_CODE     ((EV_FEATURES) &  1) /* 0000 0001 */
#define EV_FEATURE_DATA     ((EV_FEATURES) &  2) /* 0000 0010 */
#define EV_FEATURE_CONFIG   ((EV_FEATURES) &  4) /* 0000 0100 */
#define EV_FEATURE_API      ((EV_FEATURES) &  8) /* 0000 1000 */
#define EV_FEATURE_WATCHERS ((EV_FEATURES) & 16) /* 0001 0000 */
#define EV_FEATURE_BACKENDS ((EV_FEATURES) & 32) /* 0010 0000 */
#define EV_FEATURE_OS       ((EV_FEATURES) & 64) /* 0100 0000 */

内存分配

可以看到很多数组会通过 array_needsize() 函数分配内存,简单来说,为了防止频繁申请内存,每次都会尝试申请 MALLOC_ROUND 宏指定大小的内存,一般是 4K 。

如下是在 ev_timer_start() 函数中的使用方法。

array_needsize(ANHE, timers, timermax, ev_active (w) + 1, EMPTY2);

简单来说,ANHE 表示数组中的成员类型;timers 表示数组的基地址;timermax 表示当其值,因为可能会预分配一部分内存,所以在分配完成后,同时会将真正分配的内存数返回;ev_active(w)+1 表示需要申请的大小。

在分配内存时,默认会采用 realloc() 函数,如果想要自己定义,可以通过 ev_set_allocator() 函数进行设置。

处理回调

触发的事件会通过 ev_feed_event() 函数将相关的事件保存到一个二维 pendings 数组中,也就是说该数组记录了所有已经触发的事件,其中第一个维度是优先级,而第二个维度是已经触发的事件。

pendings[PRI][NUMS];
pendingmax[PRI]; 最大数组
pendingcnt[PRI]; 当前事件数

参考

源码可以从 freenode - libev 上下载,不过最近的更新是 2011 年,也可以从 github 上下载,或者下载 本地保存版本 libev-4.22;帮助文档可以参考 本地文档

对于 python ,提供了相关的扩展 Python libev interface - pyev

魅族内核团队的相关文章,一篇介绍内核如何实现信号处理,Linux Signal



如果喜欢这里的文章,而且又不差钱的话,欢迎打赏个早餐 ^_^


About This Blog

Recent Posts

Categories

Related Links

  • RTEMS
    RTEMS
  • GNU
  • Linux Kernel
  • Arduino

Search


This Site was built by Jin Yang, generated with Jekyll, and hosted on GitHub Pages
©2013-2018 – Jin Yang