lvyilong316 阅读(32) 评论(0)

 dpdk中断机制

 ——lvyilong316

这里主要介绍一下dpdk的中断机制,虽然dpdk大多数场景用的是polling模式,但是也是支持中断模式的,另一方面除了收发包之外,设备的其他功能,如状态改变等,还是要依赖中断机制。当然dpdk的中断是用户态的中断,实现方式是通过vfiouio模块将内核的中断传递到用户态,具体vfiouio的工作方式不是本文的重点,这里重点关注dpdk的中断处理流程。首先看一下dpdk中断处理相关的初始化流程。

3.5.1 中断初始化

rte_eal_initàrte_eal_intr_init

中断初始化主要在rte_eal_intr_init中完成。

l  rte_eal_intr_init

rte_eal_intr_init()函数中初始化中断。具体如下:

(1)     首先初始化intr_sources链表。所有设备的中断都挂在这个链表上,中断处理线程通过遍历这个链表,来执行设备的中断

(2)     创建intr_pipe管道,用于epoll模型的消息通知。

(3)     创建线程intr_thread,线程的执行体是eal_intr_thread_main()函数,创建epoll模型,遍历intr_sources链表,监听已注册的所有UIO设备的中断事件,并调用对应UIO设备的中断处理函数。

点击(此处)折叠或打开

  1. int rte_eal_intr_init(void)
  2. {
  3.          int ret = 0, ret_1 = 0;
  4.          char thread_name[RTE_MAX_THREAD_NAME_LEN];
  5.  
  6.          /* init the global interrupt source head */
  7.          /*初始化intr_sources全局链表,用来存放设备的中断资源*/
  8.          TAILQ_INIT(&intr_sources);
  9.  
  10.          /**
  11.           * create a pipe which will be waited by epoll and notified to
  12.           * rebuild the wait list of epoll.
  13.           */
  14.           /*创建管道,返回的两个fd存放在全局变量intr_pipe中*/
  15.          if (pipe(intr_pipe.pipefd) < 0)
  16.                    return -1;
  17.     /*创建中断处理线程*/
  18.          /* create the host thread to wait/handle the interrupt */
  19.          ret = pthread_create(&intr_thread, NULL,
  20.                             eal_intr_thread_main, NULL);
  21.          if (ret != 0) {
  22.                    RTE_LOG(ERR, EAL,
  23.                             "Failed to create thread for interrupt handling\n");
  24.          } else {
  25.                    /* Set thread_name for aid in debugging. */
  26.                    snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
  27.                             "eal-intr-thread");
  28.                    ret_1 = rte_thread_setname(intr_thread, thread_name);
  29.                    if (ret_1 != 0)
  30.                             RTE_LOG(DEBUG, EAL,
  31.                             "Failed to set thread name for interrupt handling\n");
  32.          }
  33.  
  34.          return -ret;
  35. }

    在继续分析之前先看下intr_sources这个全局链表的样子,如下图所示:

链表由struct rte_intr_source结构组成,每个struct rte_intr_source结构描述一个设备的中断信息。而struct rte_intr_source中又有三个重要成员:

l  intr_handle

点击(此处)折叠或打开

  1. struct rte_intr_handle {
  2.          RTE_STD_C11
  3.          union {
  4.                    int vfio_dev_fd; /**< VFIO device file descriptor */
  5.                    int uio_cfg_fd; /**< UIO config file descriptor
  6.                                                for uio_pci_generic */
  7.          };
  8.          int fd; /**< interrupt event file descriptor */
  9.          enum rte_intr_handle_type type; /**< handle type */
  10.          uint32_t max_intr; /* nb_efd+1 */
  11.          uint32_t nb_efd; /* efds中有效的个数 */
  12.          int efds[RTE_MAX_RXTX_INTR_VEC_ID]; /*传递中断的fd,每个队列一个 */
  13.          struct rte_epoll_event elist[RTE_MAX_RXTX_INTR_VEC_ID];
  14.                                             /**< intr vector epoll event */
  15.          int *intr_vec; /**< intr vector number array,每个队列ring 的offset*/
  16. };

这个结构用来记录设备中断的相关信息,其中主要是设备每个队列对应的传递中的fd,如(uiovfio暴露给用户态的文件打开fd)。当然较新的dpdk(如18.05)虚拟设备也可以支持中断,如vhost_user后端设备。如果对vhost_user设备的rte_intr_handle进行初始化,可以如下进行:

点击(此处)折叠或打开

  1. static int
  2. eth_vhost_install_intr(struct rte_eth_dev *dev)
  3. {
  4.          struct rte_vhost_vring vring;
  5.          struct vhost_queue *vq;
  6.          int count = 0;
  7.          int nb_rxq = dev->data->nb_rx_queues;
  8.          int i;
  9.          int ret;
  10.  
  11.          dev->intr_handle = malloc(sizeof(*dev->intr_handle));
  12.          memset(dev->intr_handle, 0, sizeof(*dev->intr_handle));
  13.          dev->intr_handle->intr_vec =
  14.                    malloc(nb_rxq * sizeof(dev->intr_handle->intr_vec[0]));
  15.  
  16.          for (i = 0; i < nb_rxq; i++) {
  17.                    vq = dev->data->rx_queues[i];
  18.                    if (!vq)
  19.                             continue;
  20.                    ret = rte_vhost_get_vhost_vring(vq->vid, i << 1, &vring);
  21.                    dev->intr_handle->intr_vec[i] = RTE_INTR_VEC_RXTX_OFFSET + i;
  22.                    dev->intr_handle->efds[i] = vring.callfd; /*对于vhost_user设备这里就使用callfd接收来自前端的中断*/
  23.                    count++;
  24.          }
  25.  
  26.          dev->intr_handle->nb_efd = count;
  27.          dev->intr_handle->max_intr = count + 1;
  28.          dev->intr_handle->type = RTE_INTR_HANDLE_VDEV;
  29.  
  30.          return 0;
  31. }

l  callbacks

这是一个rte_intr_callback结构组成的链表,主要保存设备的中断处理函数和参数信息。为什么要一个链表呢?因为可以对一个中断注册多个处理函数。

l  active

描述设备中断的状态。设备上是否有未处理的中断。

 

下面来看eal_intr_thread_main函数,也就是中断线程的主体函数。

l  eal_intr_thread_main

中断线程执行主体eal_intr_thread_main()函数具体如下:

(1)     epoll_create()创建epoll模型。

(2)     intr_pipe管道加入到epoll中。

(3)     遍历intr_sources链表,将所有UIO设备加入到epoll中。

(4)     执行eal_intr_handle_interrupts()函数。

l  eal_intr_thread_main

点击(此处)折叠或打开

  1. static __attribute__((noreturn)) void * eal_intr_thread_main(__rte_unused void *arg)
  2. {
  3.          struct epoll_event ev;
  4.  
  5.          /* host thread, never break out */
  6.          for (;;) {
  7.                    /* build up the epoll fd with all descriptors we are to
  8.                     * wait on then pass it to the handle_interrupts function
  9.                     */
  10.                    static struct epoll_event pipe_event = {
  11.                             .events = EPOLLIN | EPOLLPRI,
  12.                    };
  13.                    struct rte_intr_source *src;
  14.                    unsigned numfds = 0;
  15.  
  16.                    /* create epoll fd */
  17.                    int pfd = epoll_create(1);
  18.                    if (pfd < 0)
  19.                             rte_panic("Cannot create epoll instance\n");
  20.         /*intr_pipe是一个全局变量,在rte_eal_intr_init中已经初始化*/
  21.                    pipe_event.data.fd = intr_pipe.readfd;
  22.                    /**
  23.                     * add pipe fd into wait list, this pipe is used to
  24.                     * rebuild the wait list.
  25.                     */
  26.                     /*将intr_pipe.readfd添加到epoll的监听列表*/
  27.                    if (epoll_ctl(pfd, EPOLL_CTL_ADD, intr_pipe.readfd,
  28.                                                         &pipe_event) < 0) {
  29.                             rte_panic("Error adding fd to %d epoll_ctl, %s\n",
  30.                                                intr_pipe.readfd, strerror(errno));
  31.                    }
  32.                    numfds++;
  33.  
  34.                    rte_spinlock_lock(&intr_lock);
  35.         /*遍历intr_sources链表,将所有设备的中断通知fd加入到epoll中*/
  36.                    TAILQ_FOREACH(src, &intr_sources, next) {
  37.                             if (src->callbacks.tqh_first == NULL)
  38.                                      continue; /* skip those with no callbacks */
  39.                             ev.events = EPOLLIN | EPOLLPRI;
  40.                             ev.data.fd = src->intr_handle.fd;
  41.  
  42.                             /**
  43.                              * add all the uio device file descriptor
  44.                              * into wait list.
  45.                              */
  46.                             if (epoll_ctl(pfd, EPOLL_CTL_ADD,
  47.                                                src->intr_handle.fd, &ev) < 0){
  48.                                      rte_panic("Error adding fd %d epoll_ctl, %s\n",
  49.                                                src->intr_handle.fd, strerror(errno));
  50.                             }
  51.                             else
  52.                                      numfds++;
  53.                    }
  54.                    rte_spinlock_unlock(&intr_lock);
  55.                    /* serve the interrupt */
  56.                    eal_intr_handle_interrupts(pfd, numfds);
  57.  
  58.                    /**
  59.                     * when we return, we need to rebuild the
  60.                     * list of fds to monitor.
  61.                     */
  62.                    close(pfd);
  63.          }
  64. }

    然后函数调用eal_intr_handle_interrupts

l  eal_intr_handle_interrupts

    eal_intr_handle_interrupts主要就是在死循环中调用epoll,然后处理中断。

点击(此处)折叠或打开

  1. static void eal_intr_handle_interrupts(int pfd, unsigned totalfds)
  2. {
  3.          struct epoll_event events[totalfds];
  4.          int nfds = 0;
  5.  
  6.          for(;;) {
  7.                    nfds = epoll_wait(pfd, events, totalfds,
  8.                             EAL_INTR_EPOLL_WAIT_FOREVER);
  9.                    /* epoll_wait fail */
  10.                    if (nfds < 0) {
  11.                             if (errno == EINTR)
  12.                                      continue;
  13.                             RTE_LOG(ERR, EAL,
  14.                                      "epoll_wait returns with fail\n");
  15.                             return;
  16.                    }
  17.                    /* epoll_wait timeout, will never happens here */
  18.                    else if (nfds == 0)
  19.                             continue;
  20.                    /* epoll_wait has at least one fd ready to read */
  21.         /* 注意只有这里返回小于0,这个无限循环才会退出 */
  22.                    if (eal_intr_process_interrupts(events, nfds) < 0)
  23.                             return;
  24.          }
  25. }

这个函数在一个for(;;)死循环中,调用epoll_wait()阻塞模式监听事件。如果有事件发生,则调用eal_intr_process_interrupts()函数。

l  eal_intr_process_interrupts

点击(此处)折叠或打开

  1. static int eal_intr_process_interrupts(struct epoll_event *events, int nfds)
  2. {
  3.          int n, bytes_read;
  4.          struct rte_intr_source *src;
  5.          struct rte_intr_callback *cb;
  6.          union rte_intr_read_buffer buf;
  7.          struct rte_intr_callback active_cb;
  8.  
  9.          for (n = 0; n < nfds; n++) {
  10.  
  11.                    /**
  12.                     * if the pipe fd is ready to read, return out to
  13.                     * rebuild the wait list.
  14.                     */
  15.                     /*如果是pipefd有数据,说明有新注册的中断,返回-1让上层退出无限循环,重新扫描intr_sources 链表,添加中断fd*/
  16.                    if (events[n].data.fd == intr_pipe.readfd){
  17.                             int r = read(intr_pipe.readfd, buf.charbuf,
  18.                                                sizeof(buf.charbuf));
  19.                             RTE_SET_USED(r);
  20.                             return -1;
  21.                    }
  22.                    rte_spinlock_lock(&intr_lock);
  23.                    /*遍历intr_sources 链表,处理上面的中断*/
  24.                    TAILQ_FOREACH(src, &intr_sources, next)
  25.                             if (src->intr_handle.fd ==
  26.                                                events[n].data.fd) /*判断设备是否产生了中断*/
  27.                                      break;
  28.                    if (src == NULL){
  29.                             rte_spinlock_unlock(&intr_lock);
  30.                             continue;
  31.                    }
  32.  
  33.                    /* mark this interrupt source as active and release the lock. */
  34.                    src->active = 1; /*表明这个设备的中断尚未处理*/
  35.                    rte_spinlock_unlock(&intr_lock);
  36.  
  37.                    /* set the length to be read dor different handle type */
  38.                    /*根据中断设备的类型,UIO或者vfio等,设置要读取数据的大小*/
  39.                    switch (src->intr_handle.type) {
  40.                    case RTE_INTR_HANDLE_UIO:
  41.                    case RTE_INTR_HANDLE_UIO_INTX:
  42.                             bytes_read = sizeof(buf.uio_intr_count);
  43.                             break;
  44.                    case RTE_INTR_HANDLE_ALARM:
  45.                             bytes_read = sizeof(buf.timerfd_num);
  46.                             break;
  47. #ifdef VFIO_PRESENT
  48.                    case RTE_INTR_HANDLE_VFIO_MSIX:
  49.                    case RTE_INTR_HANDLE_VFIO_MSI:
  50.                    case RTE_INTR_HANDLE_VFIO_LEGACY:
  51.                             bytes_read = sizeof(buf.vfio_intr_count);
  52.                             break;
  53. #endif
  54.                    case RTE_INTR_HANDLE_EXT:
  55.                    default:
  56.                             bytes_read = 1;
  57.                             break;
  58.                    }
  59.                    /*从uio或vfio中断设备中读取中断数据*/
  60.                    if (src->intr_handle.type != RTE_INTR_HANDLE_EXT) {
  61.                             /**
  62.                              * read out to clear the ready-to-be-read flag
  63.                              * for epoll_wait.
  64.                              */
  65.                             bytes_read = read(events[n].data.fd, &buf, bytes_read);
  66.                             if (bytes_read < 0) {
  67.                                      if (errno == EINTR || errno == EWOULDBLOCK)
  68.                                                continue;
  69.  
  70.                                      RTE_LOG(ERR, EAL, "Error reading from file "
  71.                                                "descriptor %d: %s\n",
  72.                                                events[n].data.fd,
  73.                                                strerror(errno));
  74.                             } else if (bytes_read == 0)
  75.                                      RTE_LOG(ERR, EAL, "Read nothing from file "
  76.                                                "descriptor %d\n", events[n].data.fd);
  77.                    }
  78.  
  79.                    /* grab a lock, again to call callbacks and update status. */
  80.                    rte_spinlock_lock(&intr_lock);
  81.         /*调用中断设备自己的中断处理函数*/
  82.                    if (bytes_read > 0) {
  83.  
  84.                             /* Finally, call all callbacks. */
  85.             /* 注意是调用这个设备注册的所有中断处理函数 */
  86.                             TAILQ_FOREACH(cb, &src->callbacks, next) {
  87.  
  88.                                      /* make a copy and unlock. */
  89.                                      active_cb = *cb;
  90.                                      rte_spinlock_unlock(&intr_lock);
  91.  
  92.                                      /* call the actual callback */
  93.                                      active_cb.cb_fn(&src->intr_handle,
  94.                                                active_cb.cb_arg);
  95.  
  96.                                      /*get the lock back. */
  97.                                      rte_spinlock_lock(&intr_lock);
  98.                             }
  99.                    }
  100.  
  101.                    /* we done with that interrupt source, release it. */
  102.                    src->active = 0; /*处理完中断后清除设备中断状态*/
  103.                    rte_spinlock_unlock(&intr_lock);
  104.          }
  105.  
  106.          return 0;
  107. }

到此设备中断的相关初始化就结束了,整个过程如下所示:

3.5.2 设备中断注册

那么中断又是怎么注册的呢?这就不得不提rte_intr_callback_register这个函数,设备的中断处理都是通过这个函数注册的,我们看下他的实现。

l  rte_intr_callback_register

点击(此处)折叠或打开

  1. int
  2. rte_intr_callback_register(const struct rte_intr_handle *intr_handle,
  3.                             rte_intr_callback_fn cb, void *cb_arg)
  4. {
  5.          int ret, wake_thread;
  6.          struct rte_intr_source *src;
  7.          struct rte_intr_callback *callback;
  8.  
  9.          wake_thread = 0;
  10.  
  11.          /* first do parameter checking */
  12.          if (intr_handle == NULL || intr_handle->fd < 0 || cb == NULL) {
  13.                    RTE_LOG(ERR, EAL,
  14.                             "Registering with invalid input parameter\n");
  15.                    return -EINVAL;
  16.          }
  17.  
  18.          /* allocate a new interrupt callback entity */
  19.          callback = rte_zmalloc("interrupt callback list",
  20.                                      sizeof(*callback), 0);
  21.          if (callback == NULL) {
  22.                    RTE_LOG(ERR, EAL, "Can not allocate memory\n");
  23.                    return -ENOMEM;
  24.          }
  25.          /* 初始化callback */
  26.          callback->cb_fn = cb;
  27.          callback->cb_arg = cb_arg;
  28.  
  29.          rte_spinlock_lock(&intr_lock);
  30.  
  31.          /* check if there is at least one callback registered for the fd */
  32.          /* 遍历intr_sources链表,找对应的rte_intr_source */
  33.          TAILQ_FOREACH(src, &intr_sources, next) {
  34.                    if (src->intr_handle.fd == intr_handle->fd) {
  35.                             /* we had no interrupts for this */
  36.                    /* 如果这个设备的这个中断之前没有注册过处理函数,则需要唤醒中断处理线程,将这个中断fd添加到epoll中 */
  37.                             if TAILQ_EMPTY(&src->callbacks)
  38.                                      wake_thread = 1;
  39.         /* 如果这个中断已经有对应的处理函数了,说明已经在epoll中了,则只需要把新的callback加入链表 */
  40.                             TAILQ_INSERT_TAIL(&(src->callbacks), callback, next);
  41.                             ret = 0;
  42.                             break;
  43.                    }
  44.          }
  45.     /* 如果没有设备对应的rte_intr_source结构,则创建一个并添加到全局链表 */
  46.          /* no existing callbacks for this - add new source */
  47.          if (src == NULL) {
  48.                    if ((src = rte_zmalloc("interrupt source list",
  49.                                      sizeof(*src), 0)) == NULL) {
  50.                             RTE_LOG(ERR, EAL, "Can not allocate memory\n");
  51.                             rte_free(callback);
  52.                             ret = -ENOMEM;
  53.                    } else {
  54.                             src->intr_handle = *intr_handle;
  55.                             TAILQ_INIT(&src->callbacks);
  56.                             TAILQ_INSERT_TAIL(&(src->callbacks), callback, next);
  57.                             TAILQ_INSERT_TAIL(&intr_sources, src, next);
  58.                             wake_thread = 1;
  59.                             ret = 0;
  60.                    }
  61.          }
  62.  
  63.          rte_spinlock_unlock(&intr_lock);
  64.  
  65.          /**
  66.           * check if need to notify the pipe fd waited by epoll_wait to
  67.           * rebuild the wait list.
  68.           */
  69.          if (wake_thread) /* 唤醒中断处理线程 */
  70.                    if (write(intr_pipe.writefd, "1", 1) < 0)
  71.                             return -EPIPE;
  72.  
  73.          return ret;
  74. }

这个函数主要是为中断创建一个rte_intr_source结构,我们从其参数可以看出来,参数正式rte_intr_source结构成员所需要的,然后将rte_intr_source结构加入全局链表intr_sources中,并通知前面创建的中断处理线程,中断处理线程可以再次遍历intr_sources,将新加入的rte_intr_source中的handle->fd加入epoll中处理。整个处理流程如下所示。

下面列举一个uio/vfio设备的中断回调函数注册的完整路径,以ixgbevf为例:

rte_eth_dev_pci_probe àeth_ixgbevf_dev_inità rte_intr_callback_register,其中rte_eth_dev_pci_probe在下面的“绑定驱动”中会介绍。

对应ixgbevf其调用如下,这次的中断处理函数为ixgbevf_dev_interrupt_handler

          rte_intr_callback_register(intr_handle,ixgbevf_dev_interrupt_handler, eth_dev);

l  ixgbevf_dev_interrupt_handler

点击(此处)折叠或打开

  1. static void ixgbevf_dev_interrupt_handler(__rte_unused struct rte_intr_handle *handle,
  2.                                   void *param)
  3. {
  4.          struct rte_eth_dev *dev = (struct rte_eth_dev *)param;
  5.     /*暂时先禁止中断*/
  6.          ixgbevf_dev_interrupt_get_status(dev);
  7.          ixgbevf_dev_interrupt_action(dev);
  8. }

l  ixgbevf_dev_interrupt_action

其中主要是SRIOV设备,mailbox的处理,这里不再展开。

点击(此处)折叠或打开

  1. static int ixgbevf_dev_interrupt_action(struct rte_eth_dev *dev)
  2. {
  3.          struct ixgbe_hw *hw = IXGBE_DEV_PRIVATE_TO_HW(dev->data->dev_private);
  4.          struct ixgbe_interrupt *intr =
  5.                    IXGBE_DEV_PRIVATE_TO_INTR(dev->data->dev_private);
  6.  
  7.          if (intr->flags & IXGBE_FLAG_MAILBOX) {
  8.                    ixgbevf_mbx_process(dev);
  9.                    intr->flags &= ~IXGBE_FLAG_MAILBOX;
  10.          }
  11.     /*开启中断*/
  12.          ixgbevf_intr_enable(hw);
  13.  
  14.          return 0;
  15. }

3.5.3 接收队列中断注册

我们上面讲了设备的中断注册,但是上面所说的中断注册一般不是数据中断,而是控制中断,比如设备状态改变等情况。这种中断我们一般会设置intr_handle->fd,如上面的描述,但是如果我们想要注册设备的接收队列中断呢(rxq interrupt),由于设备可能是多队列,那么显然一个fd是不够的,所以我们可以像上面为vhost_user设备注册中断一样(eth_vhost_install_intr使用intr_handle->efds这个数组为每个rxq设置一个中断fd。但是这就有个问题,我们在“中断初始化”中分析eal_intr_thread_main中讲过,中断处理线程仅会将intr_handle->fd加入epoll中,但是并不会添加intr_handle->efds。那我们设置intr_handle->efds该怎么使用呢?其实这就涉及到数据面的中断注册了,一个非常好的例子是dpdk代码中的examples\l3fwd-power

普通的DPDK是采用的PMD模式,也就是轮询模式,这种模式下无论是否有报文处理,都是采用的轮询也就是CPU占用率100%l3fwd-power就是为了解决这个问题,当CPU根本就不需要处理报文的时候进入省电模式也就是中断模式。我们这里只关注其中的中断注册,其他暂时不去分析。设备的rxq中断是从event_register注册的。

l  event_register

点击(此处)折叠或打开

  1. static int event_register(struct lcore_conf *qconf)
  2. {
  3.          struct lcore_rx_queue *rx_queue;
  4.          uint8_t portid, queueid;
  5.          uint32_t data;
  6.          int ret;
  7.          int i;
  8.     /* 为设备的每个接收队列调用rte_eth_dev_rx_intr_ctl_q注册中断 */
  9.          for (i = 0; i < qconf->n_rx_queue; ++i) {
  10.                    rx_queue = &(qconf->rx_queue_list[i]);
  11.                    portid = rx_queue->port_id;
  12.                    queueid = rx_queue->queue_id;
  13.                    data = portid << CHAR_BIT | queueid;
  14.  
  15.                    ret = rte_eth_dev_rx_intr_ctl_q(portid, queueid,
  16.                                                         RTE_EPOLL_PER_THREAD,
  17.                                                         RTE_INTR_EVENT_ADD,
  18.                                                         (void *)((uintptr_t)data));
  19.                    if (ret)
  20.                             return ret;
  21.          }
  22.  
  23.          return 0;
  24. }

其中注册每个rxq的中断由rte_eth_dev_rx_intr_ctl_q函数完成,注意RTE_EPOLL_PER_THREAD的值为-1

l  rte_eth_dev_rx_intr_ctl_q

点击(此处)折叠或打开

  1. int rte_eth_dev_rx_intr_ctl_q(uint8_t port_id, uint16_t queue_id,
  2.                               int epfd, int op, void *data)
  3. {
  4.          uint32_t vec;
  5.          struct rte_eth_dev *dev;
  6.          struct rte_intr_handle *intr_handle;
  7.          int rc;
  8.  
  9.          RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -ENODEV);
  10.     /* 根据port_id找到对应的struct rte_eth_dev */
  11.          dev = &rte_eth_devices[port_id];
  12.          if (queue_id >= dev->data->nb_rx_queues) {
  13.                    RTE_PMD_DEBUG_TRACE("Invalid RX queue_id=%u\n", queue_id);
  14.                    return -EINVAL;
  15.          }
  16.     /* 检查设备是否初始化了intr_handle */
  17.          if (!dev->intr_handle) {
  18.                    RTE_PMD_DEBUG_TRACE("RX Intr handle unset\n");
  19.                    return -ENOTSUP;
  20.          }
  21.   
  22.          intr_handle = dev->intr_handle;
  23.          if (!intr_handle->intr_vec) {
  24.                    RTE_PMD_DEBUG_TRACE("RX Intr vector unset\n");
  25.                    return -EPERM;
  26.          }
  27.          /* intr_handle->intr_vec[queue_id]为queue的ring idx */
  28.          vec = intr_handle->intr_vec[queue_id];
  29.          rc = rte_intr_rx_ctl(intr_handle, epfd, op, vec, data);
  30.          if (rc && rc != -EEXIST) {
  31.                    RTE_PMD_DEBUG_TRACE("p %u q %u rx ctl error"
  32.                                      " op %d epfd %d vec %u\n",
  33.                                      port_id, queue_id, op, epfd, vec);
  34.                    return rc;
  35.          }
  36.  
  37.          return 0;
  38. }

这个函数调用了一系列检查,最终调用rte_intr_rx_ctl完成中断fd注册。在看rte_intr_rx_ctl实现之前,先看下rte_intr_handle的之前没展开的细节结构,如下所示。

l  rte_intr_rx_ctl

点击(此处)折叠或打开

  1. int
  2. rte_intr_rx_ctl(struct rte_intr_handle *intr_handle, int epfd,
  3.                    int op, unsigned int vec, void *data)
  4. {
  5.          struct rte_epoll_event *rev;
  6.          struct rte_epoll_data *epdata;
  7.          int epfd_op;
  8.          unsigned int efd_idx;
  9.          int rc = 0;
  10.    
  11.          efd_idx = (vec >= RTE_INTR_VEC_RXTX_OFFSET) ?
  12.                    (vec - RTE_INTR_VEC_RXTX_OFFSET) : vec;
  13.  
  14.          if (!intr_handle || intr_handle->nb_efd == 0 ||
  15.              efd_idx >= intr_handle->nb_efd) {
  16.                    RTE_LOG(ERR, EAL, "Wrong intr vector number.\n");
  17.                    return -EPERM;
  18.          }
  19.  
  20.          switch (op) {
  21.          case RTE_INTR_EVENT_ADD:
  22.                    epfd_op = EPOLL_CTL_ADD;
  23.                    rev = &intr_handle->elist[efd_idx];
  24.         /* rev->status != RTE_EPOLL_INVALID说明这个中断fd已经加入了epoll了 */
  25.                    if (rev->status != RTE_EPOLL_INVALID) {
  26.                             RTE_LOG(INFO, EAL, "Event already been added.\n");
  27.                             return -EEXIST;
  28.                    }
  29.         /* 设置intr_handle->elist[efd_idx].epdata */
  30.                    /* attach to intr vector fd */
  31.                    epdata = &rev->epdata;
  32.                    epdata->event = EPOLLIN | EPOLLPRI | EPOLLET;
  33.                    epdata->data = data;
  34.                    epdata->cb_fun = (rte_intr_event_cb_t)eal_intr_proc_rxtx_intr;
  35.                    epdata->cb_arg = (void *)intr_handle;
  36.                    /* 注意这里传入的是intr_handle->efds[efd_idx] */
  37.                    rc = rte_epoll_ctl(epfd, epfd_op,
  38.                                         intr_handle->efds[efd_idx], rev);
  39.                    if (!rc)
  40.                             RTE_LOG(DEBUG, EAL,
  41.                                      "efd %d associated with vec %d added on epfd %d"
  42.                                      "\n", rev->fd, vec, epfd);
  43.                    else
  44.                             rc = -EPERM;
  45.                    break;
  46.          case RTE_INTR_EVENT_DEL:
  47.                    epfd_op = EPOLL_CTL_DEL;
  48.                    rev = &intr_handle->elist[efd_idx];
  49.                    if (rev->status == RTE_EPOLL_INVALID) {
  50.                             RTE_LOG(INFO, EAL, "Event does not exist.\n");
  51.                             return -EPERM;
  52.                    }
  53.  
  54.                    rc = rte_epoll_ctl(rev->epfd, epfd_op, rev->fd, rev);
  55.                    if (rc)
  56.                             rc = -EPERM;
  57.                    break;
  58.          default:
  59.                    RTE_LOG(ERR, EAL, "event op type mismatch\n");
  60.                    rc = -EPERM;
  61.          }
  62.  
  63.          return rc;
  64. }

由于是中断注册,我们只关注RTE_INTR_EVENT_ADD的逻辑,这里我们终于看到了intr_handle->efds[efd_idx],通过rte_epoll_ctl进行注册,同时我们也看到了这里会初始化一个中断处理函数eal_intr_proc_rxtx_intr,这个我们后面分析。

l  rte_epoll_ctl

点击(此处)折叠或打开

  1. int
  2. rte_epoll_ctl(int epfd, int op, int fd,
  3.                struct rte_epoll_event *event)
  4. {
  5.          struct epoll_event ev;
  6.  
  7.          if (!event) {
  8.                    RTE_LOG(ERR, EAL, "rte_epoll_event can't be NULL\n");
  9.                    return -1;
  10.          }
  11.  
  12.          /* using per thread epoll fd */
  13.          /*如果epfd为-1,则创建epollfd,注意这里把epollfd存放在了“每线程变量中”*/
  14.          if (epfd == RTE_EPOLL_PER_THREAD)
  15.                    epfd = rte_intr_tls_epfd();
  16.  
  17.          if (op == EPOLL_CTL_ADD) {
  18.                    event->status = RTE_EPOLL_VALID;
  19.                    event->fd = fd; /* ignore fd in event */
  20.                    event->epfd = epfd;
  21.                    ev.data.ptr = (void *)event;
  22.          }
  23.  
  24.          ev.events = event->epdata.event;
  25.          /*添加到epoll中*/
  26.          if (epoll_ctl(epfd, op, fd, &ev) < 0) {
  27.                    RTE_LOG(ERR, EAL, "Error op %d fd %d epoll_ctl, %s\n",
  28.                             op, fd, strerror(errno));
  29.                    if (op == EPOLL_CTL_ADD)
  30.                             /* rollback status when CTL_ADD fail */
  31.                             event->status = RTE_EPOLL_INVALID;
  32.                    return -1;
  33.          }
  34.  
  35.          if (op == EPOLL_CTL_DEL && event->status != RTE_EPOLL_INVALID)
  36.                    eal_epoll_data_safe_free(event);
  37.  
  38.          return 0;
  39. }

这个函数主要就是创建一个per threadepollfd,然后调用了epoll_ctl来讲rxqfd加入epollfd。到此中断注册就完成了。下面我们看中断回调过程。整个中断线程就是dataplane的的主线程。具体不再展开,调用路径如下所示。

这里我们主要看一下rte_epoll_wait的处理逻辑,之所以要对epoll_wait进行一次封装,主要是在epoll_wait返回后调用了eal_epoll_process_event

l  rte_epoll_wait

点击(此处)折叠或打开

  1. int rte_epoll_wait(int epfd, struct rte_epoll_event *events,
  2.                 int maxevents, int timeout)
  3. {
  4.          struct epoll_event evs[maxevents];
  5.          int rc;
  6.  
  7.          if (!events) {
  8.                    RTE_LOG(ERR, EAL, "rte_epoll_event can't be NULL\n");
  9.                    return -1;
  10.          }
  11.  
  12.          /* using per thread epoll fd */
  13.          /* 获取之前创建的epollfd */
  14.          if (epfd == RTE_EPOLL_PER_THREAD)
  15.                    epfd = rte_intr_tls_epfd();
  16.  
  17.          while (1) {
  18.                    rc = epoll_wait(epfd, evs, maxevents, timeout);
  19.                    if (likely(rc > 0)) {
  20.                             /* epoll_wait has at least one fd ready to read */
  21.                             rc = eal_epoll_process_event(evs, rc, events);
  22.                             break;
  23.                    } else if (rc < 0) {
  24.                             if (errno == EINTR)
  25.                                      continue;
  26.                             /* epoll_wait fail */
  27.                             RTE_LOG(ERR, EAL, "epoll_wait returns with fail %s\n",
  28.                                      strerror(errno));
  29.                             rc = -1;
  30.                             break;
  31.                    } else {
  32.                             /* rc == 0, epoll_wait timed out */
  33.                             break;
  34.                    }
  35.          }
  36.  
  37.          return rc;
  38. }

l  eal_epoll_process_event

点击(此处)折叠或打开

  1. static int
  2. eal_epoll_process_event(struct epoll_event *evs, unsigned int n,
  3.                             struct rte_epoll_event *events)
  4. {
  5.          unsigned int i, count = 0;
  6.          struct rte_epoll_event *rev;
  7.  
  8.          for (i = 0; i < n; i++) {
  9.                    rev = evs[i].data.ptr;
  10.                    if (!rev || !rte_atomic32_cmpset(&rev->status, RTE_EPOLL_VALID,
  11.                                                          RTE_EPOLL_EXEC))
  12.                             continue;
  13.  
  14.                    events[count].status = RTE_EPOLL_VALID;
  15.                    events[count].fd = rev->fd;
  16.                    events[count].epfd = rev->epfd;
  17.                    events[count].epdata.event = rev->epdata.event;
  18.                    events[count].epdata.data = rev->epdata.data;
  19.                    if (rev->epdata.cb_fun)
  20.                             rev->epdata.cb_fun(rev->fd,
  21.                                                   rev->epdata.cb_arg);
  22.  
  23.                    rte_compiler_barrier();
  24.                    rev->status = RTE_EPOLL_VALID;
  25.                    count++;
  26.          }
  27.          return count;
  28. }

eal_epoll_process_event的主要逻辑就是调用之前rte_intr_rx_ctl中注册的epdata.cb_fun,也就是eal_intr_proc_rxtx_intr

l  eal_intr_proc_rxtx_intr

这个函数其实主要就是读出fd中的数据,以免下次将加入epoll中直接返回,当然这是dpdk 17.02的实现,在18.05中加入了RTE_INTR_HANDLE_VDEV,也就是之前我们注册vhost_user时使用的handle type,对应RTE_INTR_HANDLE_VDEV是不需要从fd读数据的,所以bytes_read0

点击(此处)折叠或打开

  1. static void eal_intr_proc_rxtx_intr(int fd, const struct rte_intr_handle *intr_handle)
  2. {
  3.          union rte_intr_read_buffer buf;
  4.          int bytes_read = 1;
  5.          int nbytes;
  6.  
  7.          switch (intr_handle->type) {
  8.          case RTE_INTR_HANDLE_UIO:
  9.          case RTE_INTR_HANDLE_UIO_INTX:
  10.                    bytes_read = sizeof(buf.uio_intr_count);
  11.                    break;
  12. #ifdef VFIO_PRESENT
  13.          case RTE_INTR_HANDLE_VFIO_MSIX:
  14.          case RTE_INTR_HANDLE_VFIO_MSI:
  15.          case RTE_INTR_HANDLE_VFIO_LEGACY:
  16.                    bytes_read = sizeof(buf.vfio_intr_count);
  17.                    break;
  18. #endif
  19.          default:
  20.                    bytes_read = 1;
  21.                    RTE_LOG(INFO, EAL, "unexpected intr type\n");
  22.                    break;
  23.          }
  24.  
  25.          /**
  26.           * read out to clear the ready-to-be-read flag
  27.           * for epoll_wait.
  28.           */
  29.          do {
  30.                    nbytes = read(fd, &buf, bytes_read);
  31.                    if (nbytes < 0) {
  32.                             if (errno == EINTR || errno == EWOULDBLOCK ||
  33.                                 errno == EAGAIN)
  34.                                      continue;
  35.                             RTE_LOG(ERR, EAL,
  36.                                      "Error reading from fd %d: %s\n",
  37.                                      fd, strerror(errno));
  38.                    } else if (nbytes == 0)
  39.                             RTE_LOG(ERR, EAL, "Read nothing from fd %d\n", fd);
  40.                    return;
  41.          } while (1);
  42. }

    之后就返回主线程了,主线程函数在rte_epoll_wait返回后调用收包逻辑处理。

下面是整个中断注册回调逻辑图。