lvyilong316 阅读(63) 评论(0)

mergeable对接收端(guest)的影响

——lvyilong316

这里在分析一下guset内部对于开启mergeable接收会有什么影响,顺便分析一下开启GUEST_GSO/GUEST_TSO时,guset内部的接收流程。

首先我们从vhost-user,发送端分析一下,两种情况是如何更新used->ring的。

l  mergeable情况

reserve_avail_buf_mergeabledpdk代码)函数中有一下逻辑:

点击(此处)折叠或打开

  1. /*fill_vec_buf的作用是找一个desc chain,用来存放mbuf,然后buf_vec记录这些desc的信息*/
  2.                    if (unlikely(fill_vec_buf(dev, vq, cur_idx, &vec_idx, buf_vec,
  3.                                                         &head_idx, &len) < 0))
  4.                             return -1;
  5.                    len = RTE_MIN(len, size);
  6.                    update_shadow_used_ring(vq, head_idx, len);


    这里我们在“vhost_user mergeable特性”中已经分析过,fill_vec_buf是遍历当前desc chain,然后将这个chain的信息记录在buf_vec中,同时len中返回的是这个chain能存放的数据长度

    update_shadow_used_ring中会将这个长度赋值给vq->shadow_used_ring[i].len,如下:


点击(此处)折叠或打开

  1. static inline void __attribute__((always_inline))
  2. update_shadow_used_ring(struct vhost_virtqueue *vq,
  3.                              uint16_t desc_idx, uint16_t len)
  4. {
  5.          uint16_t i = vq->shadow_used_idx++;
  6.  
  7.          vq->shadow_used_ring[i].id = desc_idx;
  8.          vq->shadow_used_ring[i].len = len;
  9. }


最后在flush_shadow_used_ringvq->shadow_used_ring[i].len最终被赋值给vq->used->ring[i].len。也就是vq->used->ring[i].len存放的是一个chain的长度。

l  开启GUEST_GSO/GUEST_TSO (不开启mergeable

这种情况vhost_user后端不会去特殊处理,和普通报文一样。在virtio_dev_rx的处理逻辑中有如下代码:


点击(此处)折叠或打开

  1. for (i = 0; i < count; i++) {
  2.                    used_idx = (start_idx + i) & (vq->size - 1);
  3.                    desc_indexes[i] = vq->avail->ring[used_idx];
  4.                    vq->used->ring[used_idx].id = desc_indexes[i];
  5.         /* vq->used->ring[used_idx].len 存放的是整个数据包长加上virtio header的长度*/
  6.                    vq->used->ring[used_idx].len = pkts[i]->pkt_len + dev->vhost_hlen;
  7.                    vhost_log_used_vring(dev, vq,
  8.                             offsetof(struct vring_used, ring[used_idx]),
  9.                             sizeof(vq->used->ring[used_idx]));
  10.          }


这里vq->used->ring[used_idx].len 存放的是整个数据包长加上virtio header的长度,因为在非mergeable情况,一个数据包要么被一个chain装完,要么丢弃,所以只有发送成功,就不存在一个chain只装了部分数据的情况。

下面我们再看guset接收端代码,就kernel(3.10)virtio_net代码。以下是guset的收包逻辑:

l  virtnet_poll


点击(此处)折叠或打开

  1. static int virtnet_poll(struct napi_struct *napi, int budget)
  2. {
  3.     /*……*/
  4.          while (received < budget && /*virtqueue_get_buf取出要接收的skb*/
  5.                 (buf = virtqueue_get_buf(rq->vq, &len)) != NULL) {
  6.                    receive_buf(rq, buf, len); /*真正的接收处理操作,最终调用netif_receive_skb*/
  7.                    --rq->num;
  8.                    received++;
  9.          }
  10.     /*……*/
  11. }


我们只看和我们分析有关的逻辑。首先调用virtqueue_get_buf从队列中取出一个mbuf,并返回一个长度len


点击(此处)折叠或打开

  1. void *virtqueue_get_buf(struct virtqueue *_vq, unsigned int *len)
  2. {
  3.          struct vring_virtqueue *vq = to_vvq(_vq);
  4.          void *ret;
  5.          unsigned int i;
  6.          u16 last_used;
  7.     /*……*/
  8.          virtio_rmb(vq->weak_barriers);
  9.     /*获取本次要是有的used_elem数组index*/
  10.          last_used = (vq->last_used_idx & (vq->vring.num - 1));
  11.          i = vq->vring.used->ring[last_used].id; /*本次要接受skb对应的data下标,也是skb对应第一个desc的index*/
  12.          *len = vq->vring.used->ring[last_used].len;/*本次要接受skb的长度*/
  13.  
  14.          /* detach_buf clears data, so grab it now. */
  15.          /*取出要接受的skb*/
  16.          ret = vq->data[i];
  17.          /*释放skb对应的desc chain*/
  18.          detach_buf(vq, i);
  19.          vq->last_used_idx++;
  20.          /* If we expect an interrupt for the next entry, tell host
  21.           * by writing event index and flush out the write before
  22.           * the read in the next get_buf call. */
  23.          if (!(vq->vring.avail->flags & VRING_AVAIL_F_NO_INTERRUPT)) {
  24.                    vring_used_event(&vq->vring) = vq->last_used_idx;
  25.                    virtio_mb(vq->weak_barriers);
  26.          }
  27.  
  28.          END_USE(vq);
  29.          return ret;
  30. }


这里注意以下几点:

(1)     返回的len存放的是vq->vring.used->ring[last_used].len中的值,上面我们分析过,在mergeable情况下这是一个chain的长度(如果数据包的长度小于chain能装的数据长度,则为数据包的长度+virtio header),在GUEST_TSO*的情况,这是一个数据包的长度+virtio header

(2)     detach_buf 会释放当前desc chain,而不仅是一个desc,因为无论那种情况,这个chain中的数据再之后都会被取出,可以归还给后端了。

l  receive_buf


点击(此处)折叠或打开

  1. static void receive_buf(struct receive_queue *rq, void *buf, unsigned int len)
  2. {
  3.          struct virtnet_info *vi = rq->vq->vdev->priv;
  4.          struct net_device *dev = vi->dev;
  5.          struct virtnet_stats *stats = this_cpu_ptr(vi->stats);
  6.          struct sk_buff *skb;
  7.          struct skb_vnet_hdr *hdr;
  8.     /*……*/
  9.          if (vi->mergeable_rx_bufs)
  10.                    skb = receive_mergeable(dev, rq, buf, len);
  11.          else if (vi->big_packets)
  12.                    skb = receive_big(dev, rq, buf);
  13.          else
  14.                    skb = receive_small(buf, len);
  15.  
  16.          if (unlikely(!skb))
  17.                    return;
  18.  
  19.          hdr = skb_vnet_hdr(skb);
  20.  
  21.          u64_stats_update_begin(&stats->rx_syncp);
  22.          stats->rx_bytes += skb->len;
  23.          stats->rx_packets++;
  24.          u64_stats_update_end(&stats->rx_syncp);
  25.  
  26.          if (hdr->hdr.flags & VIRTIO_NET_HDR_F_NEEDS_CSUM) {
  27.                    pr_debug("Needs csum!\n");
  28.                    if (!skb_partial_csum_set(skb,
  29.                                                  hdr->hdr.csum_start,
  30.                                                  hdr->hdr.csum_offset))
  31.                             goto frame_err;
  32.          } else if (hdr->hdr.flags & VIRTIO_NET_HDR_F_DATA_VALID) {
  33.                    skb->ip_summed = CHECKSUM_UNNECESSARY;
  34.          }
  35.  
  36.          skb->protocol = eth_type_trans(skb, dev);
  37.          pr_debug("Receiving skb proto 0x%04x len %i type %i\n",
  38.                     ntohs(skb->protocol), skb->len, skb->pkt_type);
  39.     /*根据后端填入virtio_net_hdr中的信息,设置gso的相关字段,说明收到的是大包*/
  40.          if (hdr->hdr.gso_type != VIRTIO_NET_HDR_GSO_NONE) {
  41.                    pr_debug("GSO!\n");
  42.                    switch (hdr->hdr.gso_type & ~VIRTIO_NET_HDR_GSO_ECN) {
  43.                    case VIRTIO_NET_HDR_GSO_TCPV4:
  44.                             skb_shinfo(skb)->gso_type = SKB_GSO_TCPV4;
  45.                             break;
  46.                    case VIRTIO_NET_HDR_GSO_UDP:
  47.                             skb_shinfo(skb)->gso_type = SKB_GSO_UDP;
  48.                             break;
  49.                    case VIRTIO_NET_HDR_GSO_TCPV6:
  50.                             skb_shinfo(skb)->gso_type = SKB_GSO_TCPV6;
  51.                             break;
  52.                    default:
  53.                             net_warn_ratelimited("%s: bad gso type %u.\n",
  54.                                                     dev->name, hdr->hdr.gso_type);
  55.                             goto frame_err;
  56.                    }
  57.  
  58.                    if (hdr->hdr.gso_type & VIRTIO_NET_HDR_GSO_ECN)
  59.                             skb_shinfo(skb)->gso_type |= SKB_GSO_TCP_ECN;
  60.  
  61.                    skb_shinfo(skb)->gso_size = hdr->hdr.gso_size;
  62.                    if (skb_shinfo(skb)->gso_size == 0) {
  63.                             net_warn_ratelimited("%s: zero gso size.\n", dev->name);
  64.                             goto frame_err;
  65.                    }
  66.  
  67.                    /* Header must be checked, and gso_segs computed. */
  68.                    skb_shinfo(skb)->gso_type |= SKB_GSO_DODGY;
  69.                    skb_shinfo(skb)->gso_segs = 0;
  70.          }
  71.     /*发往协议栈*/
  72.          netif_receive_skb(skb);
  73.          return;
  74.  
  75. frame_err:
  76.          dev->stats.rx_frame_errors++;
  77.          dev_kfree_skb(skb);
  78. }


     如果mergeable开启,则vi->mergeable_rx_bufs会被置位,如果GUEST_TSO*被打开,则vi->big_packets会被置位。所以分析两种情况的接收处理就是看相应的调用函数,即receive_mergeablereceive_big。在分析这两个函数前,首先来看receive_buf的后半部分,根据后端填入virtio_net_hdr中的信息,设置gso(这里用于收方向,即gro)的相关字段。所以要想guset能够接收大包(LRO)功能不但需要开启相关flagGUEST_TSO*mergeable),还依赖后端对virtio header的设置,如果后端处理了切割大包逻辑,以链表形式给前端,并设置相应virtio header,则guset就可以不用再分片,否则如果后端没有处理分片,仅仅把大包发给guset,则guset还需要进行GRO处理。下面分析receive_mergeablereceive_big

l  receive_mergeable

点击(此处)折叠或打开

  1. static struct sk_buff *receive_mergeable(struct net_device *dev,
  2.                                                 struct receive_queue *rq,
  3.                                                 void *buf,
  4.                                                 unsigned int len)
  5. {
  6.     /*从第一个page中获取到virtio header*/
  7.          struct skb_vnet_hdr *hdr = page_address(buf);
  8.          /*从virtio header中获取这个数据包所用的desc chain个数*/
  9.          int num_buf = hdr->mhdr.num_buffers;
  10.          struct page *page = buf;
  11.          /*将page中的数据转换为skb*/
  12.          struct sk_buff *skb = page_to_skb(rq, page, len);
  13.          int i;
  14.  
  15.          if (unlikely(!skb))
  16.                    goto err_skb;
  17.  
  18.          while (--num_buf) { /*对应当前数据包使用的每个chain*/
  19.                    i = skb_shinfo(skb)->nr_frags;
  20.                    if (i >= MAX_SKB_FRAGS) {
  21.                             pr_debug("%s: packet too long\n", skb->dev->name);
  22.                             skb->dev->stats.rx_length_errors++;
  23.                             goto err_frags;
  24.                    }
  25.         /*对接下来的每个desc chain 再次调用virtqueue_get_buf */
  26.                    page = virtqueue_get_buf(rq->vq, &len);
  27.                    if (!page) {
  28.                             pr_debug("%s: rx error: %d buffers %d missing\n",
  29.                                       dev->name, hdr->mhdr.num_buffers, num_buf);
  30.                             dev->stats.rx_length_errors++;
  31.                             goto err_buf;
  32.                    }
  33.  
  34.                    if (len > PAGE_SIZE)
  35.                             len = PAGE_SIZE;
  36.  
  37.                    set_skb_frag(skb, page, 0, &len);
  38.  
  39.                    --rq->num;
  40.          }
  41.          return skb;
  42. }
   可以看出mergeable的情况,由于一个数据包可能使用多个chain,则会对每个chain在此调用virtqueue_get_buf,获取对应pagemergeable的情况每个chain的长度为1,对应的也是一个page),然后通过set_skb_frag将之后的每个chaindesc)对应的page加入首个skbskb_shinfo(skb)->frags[i]中。所以mergeable情况收到的大包,会有skb_shinfo(skb)->frags[],其对应的每个desc对应skb_shinfo(skb)->frags[]的一个page。下面看receive_big


l  receive_big

点击(此处)折叠或打开

  1. static struct sk_buff *receive_big(struct net_device *dev,
  2.                                         struct receive_queue *rq,
  3.                                         void *buf)
  4. {
  5.          struct page *page = buf;
  6.          struct sk_buff *skb = page_to_skb(rq, page, 0);
  7.          return skb;
  8. }

直接调用page_to_skb,这个在receive_mergeable中也有调用。

l  page_to_skb

点击(此处)折叠或打开

  1. static struct sk_buff *page_to_skb(struct receive_queue *rq,
  2.                                         struct page *page, unsigned int len)
  3. {
  4.          struct virtnet_info *vi = rq->vq->vdev->priv;
  5.          struct sk_buff *skb;
  6.          struct skb_vnet_hdr *hdr;
  7.          unsigned int copy, hdr_len, offset;
  8.          char *p;
  9.  
  10.          p = page_address(page);
  11.  
  12.          /* copy small packet so we can reuse these pages for small data */
  13.          skb = netdev_alloc_skb_ip_align(vi->dev, GOOD_COPY_LEN);
  14.          if (unlikely(!skb))
  15.                    return NULL;
  16.  
  17.          hdr = skb_vnet_hdr(skb);
  18.  
  19.          if (vi->mergeable_rx_bufs) {
  20.                    hdr_len = sizeof hdr->mhdr;
  21.                    offset = hdr_len;
  22.          } else {
  23.                    hdr_len = sizeof hdr->hdr;
  24.                    offset = sizeof(struct padded_vnet_hdr);
  25.          }
  26.     /*提取virtio header*/
  27.          memcpy(hdr, p, hdr_len);
  28.  
  29.          len -= hdr_len;
  30.          p += offset;
  31.     /*将剩余数据尽可能的拷贝到当前的skb线性区中*/
  32.          copy = len;
  33.          if (copy > skb_tailroom(skb))
  34.                    copy = skb_tailroom(skb);
  35.          memcpy(skb_put(skb, copy), p, copy);
  36.  
  37.          len -= copy;
  38.          offset += copy;
  39.  
  40.          /*
  41.           * Verify that we can indeed put this data into a skb.
  42.           * This is here to handle cases when the device erroneously
  43.           * tries to receive more than is possible. This is usually
  44.           * the case of a broken device.
  45.           */
  46.          if (unlikely(len > MAX_SKB_FRAGS * PAGE_SIZE)) {
  47.                    net_dbg_ratelimited("%s: too much data\n", skb->dev->name);
  48.                    dev_kfree_skb(skb);
  49.                    return NULL;
  50.          }
  51.     /*如果第一个skb的线性区用完了,但是还有数据没拷贝出来,则添加到skb_shinfo(skb)->frags[]*/
  52.          while (len) {
  53.                    set_skb_frag(skb, page, offset, &len);
  54.                    page = (struct page *)page->private;
  55.                    offset = 0;
  56.          }
  57.  
  58.          if (page)
  59.                    give_pages(rq, page);
  60.  
  61.          return skb;
  62. }

从上面的过程总结一下:当开启GUEST_TSO*时,guest收大包会尽可能的填充skb的线性区,剩余数据填充skb_shinfo(skb)->frags[],而对于mergeable由于只有第一个chain(也就是一个desc)对应的page会填充skb线性区,其他数据都在skb_shinfo(skb)->frags[],所以mergeable可能会有更多frags

另外注意一点,当mergeableGUEST_TSO*同时开启时,由于guest是优先判断mergeable的,所以就会走mergeable逻辑。

在实现LRO时,建议使用mergeable特性,因为如果使用GUEST_TSO*,则接收小包也会是由长为17desc chain,这会造成浪费。