设为首页 加入收藏

TOP

浅析libuv源码-node事件轮询解析(4)(一)
2019-09-17 18:36:34 】 浏览:41
Tags:浅析 libuv 源码 -node 事件 解析

  这篇应该能结,简图如下。

  上一篇讲到了uv__work_submit方法,接着写了。

void uv__work_submit(uv_loop_t* loop,
                     struct uv__work* w,
                     enum uv__work_kind kind,
                     void (*work)(struct uv__work* w),
                     void (*done)(struct uv__work* w, int status)) {
  // 上篇主要讲的这里 初始化线程池等
  uv_once(&once, init_once);
  w->loop = loop;
  w->work = work;
  w->done = done;
  post(&w->wq, kind);
}

  从post开始。

static void post(QUEUE* q, enum uv__work_kind kind) {
  // 因为存在队列插入操作 需要加锁
  uv_mutex_lock(&mutex);
  if (kind == UV__WORK_SLOW_IO) {
    //跳...
  }

  QUEUE_INSERT_TAIL(&wq, q);
  // 如果有空闲线程 唤醒
  if (idle_threads > 0)
    uv_cond_signal(&cond);
  uv_mutex_unlock(&mutex);
}

  wq就是上一篇讲的线程都会用到的那个队列,这里负责插入任务,worker中取出任务。

  没想到post到这里没了,这点东西并到上一篇就好了。以后写这种系列博客还是先规划一下,不能边看源码边写……

  函数到这里就断了,看似没有线索,实际上在上一节的worker方法中,还漏了一个地方。

static void worker(void* arg) {
  // ...
  for (;;) {
    // 这里调用内部fs方法处理任务
    w = QUEUE_DATA(q, struct uv__work, wq);
    w->work(w);

    uv_mutex_lock(&w->loop->wq_mutex);
    w->work = NULL; 
    QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
    // 这个是漏了的关键
    uv_async_send(&w->loop->wq_async);
    uv_mutex_unlock(&w->loop->wq_mutex);

    // ...
  }
}

  每一条线程在每次处理完一条事务并将其插入工作队列wq后,都会调用一下这个uv_async_send方法,上一篇没讲这个。

  这里的wq_async是一个在loop上面的变量,在轮询初始化的时候出现过,这里先不看。

  uv_async_send这个方法又涉及到另外一个大模块,如下。

int uv_async_send(uv_async_t* handle) {
  // 错误处理...
  if (!uv__atomic_exchange_set(&handle->async_sent)) {
    POST_COMPLETION_FOR_REQ(loop, &handle->async_req);
  }

  return 0;
}

// 将操作结果推到iocp上面
#define POST_COMPLETION_FOR_REQ(loop, req)                              \
  if (!PostQueuedCompletionStatus((loop)->iocp,                         \
                                  0,                                    \
                                  0,                                    \
                                  &((req)->u.io.overlapped))) {         \
    uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");       \
  }

  这个地方说实话我并不是明白windows底层API的操作原理,IOCP这部分我没有去研究,只能从字面上去理解。

  关于PostXXX方法官网解释如下:

Posts an I/O completion packet to an I/O completion port.

  将一个I/O完成的数据打包到I/O完成的端口,翻译过来就是这样,个人理解上的话大概是把一个async_req丢到IOCP那里保存起来。

 

  接下来终于可以回到事件轮询部分,点题了。

int uv_run(uv_loop_t *loop, uv_run_mode mode) {
  // ...

  while (r != 0 && loop->stop_flag == 0) {
    // ...
    // call pending callbacks
    ran_pending = uv_process_reqs(loop);
    // ...
    // poll for I/O
    if (pGetQueuedCompletionStatusEx)
      uv__poll(loop, timeout);
    else
      uv__poll_wine(loop, timeout);
    // ...
  }
  // ...
}

  截取了剩下的poll for I/O、call pending callback,也就是剩下的两部分了。if判断不用管,只是一个方法兼容,最终的目的是一样的。

  所以只看uv__poll部分。

static void uv__poll(uv_loop_t* loop, DWORD timeout) {
  // ...
  // 设定阻塞时间
  uint64_t timeout_time;
  timeout_time = loop->time + timeout;

  for (repeat = 0; ; repeat++) {
    success = GetQueuedCompletionStatusEx(loop->iocp,
                                          overlappeds,
                                          ARRAY_SIZE(overlappeds),
                                          &count,
                                          timeout,
                                          FALSE);

    if (success) {
      for (i = 0; i < count; i++) {
        if (overlappeds[i].lpOverlapped) {
          req = uv_overlapped_to_req(overlappeds[i].lpOverlapped);
          uv_insert_pending_req(loop, req);
        }
      }
      uv_update_time(loop);
    } else if (GetLastError() != WAIT_TIMEOUT) {
      // ...
    } else if (timeout > 0) {
      // 超时处理...
    }
    break;
  }
}

  这里的GetQueueXXX方法与之前的PostQueueXXX正好是一对方法,都是基于IOCP,一个是存储,一个是取出。

  遍历操作就很容易懂了,取出数据后,一个个的塞到pending callback的队列中。

  把uv_insert_pending_req、uv_process

首页 上一页 1 2 下一页 尾页 1/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇EggJs快速入门 下一篇4.图片左轮播图(swiper)

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目