1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
  23  * Use is subject to license terms.
  24  * Copyright 2018 Joyent, Inc.
  25  */
  26 
  27 /*
  28  * General Soft rings - Simulating Rx rings in S/W.
  29  *
  30  * Soft ring is a data abstraction containing a queue and a worker
  31  * thread and represents a hardware Rx ring in software. Each soft
  32  * ring set can have a collection of soft rings for separating
  33  * L3/L4 specific traffic (IPv4 from IPv6 or TCP from UDP) or for
  34  * allowing a higher degree of parallelism by sending traffic to
  35  * one of the soft rings for a SRS (using a hash on src IP or port).
  36  * Each soft ring worker thread can be bound to a different CPU
  37  * allowing the processing for each soft ring to happen in parallel
  38  * and independent from each other.
  39  *
  40  * Protocol soft rings:
  41  *
  42  * Each SRS has at an minimum 3 softrings. One each for IPv4 TCP,
  43  * IPv4 UDP and rest (OTH - for IPv6 and everything else). The
  44  * SRS does dynamic polling and enforces link level bandwidth but
  45  * it does so for all traffic (IPv4 and IPv6 and all protocols) on
  46  * that link. However, each protocol layer wants a different
  47  * behaviour. For instance IPv4 TCP has per CPU squeues which
  48  * enforce their own polling and flow control so IPv4 TCP traffic
  49  * needs to go to a separate soft ring which can be polled by the
  50  * TCP squeue. It also allows TCP squeue to push back flow control
  51  * all the way to NIC hardware (if it puts its corresponding soft
  52  * ring in the poll mode and soft ring queue builds up, the
  53  * shared srs_poll_pkt_cnt goes up and SRS automatically stops
  54  * more packets from entering the system).
  55  *
  56  * Similarly, the UDP benefits from a DLS bypass and packet chaining
  57  * so sending it to a separate soft ring is desired. All the rest of
  58  * the traffic (including IPv6 is sent to OTH softring). The IPv6
  59  * traffic current goes through OTH softring and via DLS because
  60  * it need more processing to be done. Irrespective of the sap
  61  * (IPv4 or IPv6) or the transport, the dynamic polling, B/W enforcement,
  62  * cpu assignment, fanout, etc apply to all traffic since they
  63  * are implement by the SRS which is agnostic to sap or transport.
  64  *
  65  * Fanout soft rings:
  66  *
  67  * On a multithreaded system, we can assign more CPU and multi thread
  68  * the stack by creating a soft ring per CPU and spreading traffic
  69  * based on a hash computed on src IP etc. Since we still need to
  70  * keep the protocol separation, we create a set of 3 soft ring per
  71  * CPU (specified by cpu list or degree of fanout).
  72  *
  73  * NOTE: See the block level comment on top of mac_sched.c
  74  */
  75 
  76 #include <sys/types.h>
  77 #include <sys/callb.h>
  78 #include <sys/sdt.h>
  79 #include <sys/strsubr.h>
  80 #include <sys/strsun.h>
  81 #include <sys/vlan.h>
  82 #include <inet/ipsec_impl.h>
  83 #include <inet/ip_impl.h>
  84 #include <inet/sadb.h>
  85 #include <inet/ipsecesp.h>
  86 #include <inet/ipsecah.h>
  87 
  88 #include <sys/mac_impl.h>
  89 #include <sys/mac_client_impl.h>
  90 #include <sys/mac_soft_ring.h>
  91 #include <sys/mac_flow_impl.h>
  92 #include <sys/mac_stat.h>
  93 
  94 static void mac_rx_soft_ring_drain(mac_soft_ring_t *);
  95 static void mac_soft_ring_fire(void *);
  96 static void mac_soft_ring_worker(mac_soft_ring_t *);
  97 static void mac_tx_soft_ring_drain(mac_soft_ring_t *);
  98 
  99 uint32_t mac_tx_soft_ring_max_q_cnt = 100000;
 100 uint32_t mac_tx_soft_ring_hiwat = 1000;
 101 
 102 extern kmem_cache_t *mac_soft_ring_cache;
 103 
 104 #define ADD_SOFTRING_TO_SET(mac_srs, softring) {                        \
 105         if (mac_srs->srs_soft_ring_head == NULL) {                   \
 106                 mac_srs->srs_soft_ring_head = softring;                      \
 107                 mac_srs->srs_soft_ring_tail = softring;                      \
 108         } else {                                                        \
 109                 /* ADD to the list */                                   \
 110                 softring->s_ring_prev =                                      \
 111                         mac_srs->srs_soft_ring_tail;                 \
 112                 mac_srs->srs_soft_ring_tail->s_ring_next = softring;      \
 113                 mac_srs->srs_soft_ring_tail = softring;                      \
 114         }                                                               \
 115         mac_srs->srs_soft_ring_count++;                                      \
 116 }
 117 
 118 /*
 119  * mac_soft_ring_worker_wakeup
 120  *
 121  * Wake up the soft ring worker thread to process the queue as long
 122  * as no one else is processing it and upper layer (client) is still
 123  * ready to receive packets.
 124  */
 125 void
 126 mac_soft_ring_worker_wakeup(mac_soft_ring_t *ringp)
 127 {
 128         ASSERT(MUTEX_HELD(&ringp->s_ring_lock));
 129         if (!(ringp->s_ring_state & S_RING_PROC) &&
 130             !(ringp->s_ring_state & S_RING_BLANK) &&
 131             (ringp->s_ring_tid == NULL)) {
 132                 if (ringp->s_ring_wait != 0) {
 133                         ringp->s_ring_tid =
 134                             timeout(mac_soft_ring_fire, ringp,
 135                             ringp->s_ring_wait);
 136                 } else {
 137                         /* Schedule the worker thread. */
 138                         cv_signal(&ringp->s_ring_async);
 139                 }
 140         }
 141 }
 142 
 143 /*
 144  * mac_soft_ring_create
 145  *
 146  * Create a soft ring, do the necessary setup and bind the worker
 147  * thread to the assigned CPU.
 148  */
 149 mac_soft_ring_t *
 150 mac_soft_ring_create(int id, clock_t wait, uint16_t type,
 151     pri_t pri, mac_client_impl_t *mcip, mac_soft_ring_set_t *mac_srs,
 152     processorid_t cpuid, mac_direct_rx_t rx_func, void *x_arg1,
 153     mac_resource_handle_t x_arg2)
 154 {
 155         mac_soft_ring_t         *ringp;
 156         char                    name[S_RING_NAMELEN];
 157 
 158         bzero(name, 64);
 159         ringp = kmem_cache_alloc(mac_soft_ring_cache, KM_SLEEP);
 160 
 161         if (type & ST_RING_TCP) {
 162                 (void) snprintf(name, sizeof (name),
 163                     "mac_tcp_soft_ring_%d_%p", id, (void *)mac_srs);
 164         } else if (type & ST_RING_UDP) {
 165                 (void) snprintf(name, sizeof (name),
 166                     "mac_udp_soft_ring_%d_%p", id, (void *)mac_srs);
 167         } else if (type & ST_RING_OTH) {
 168                 (void) snprintf(name, sizeof (name),
 169                     "mac_oth_soft_ring_%d_%p", id, (void *)mac_srs);
 170         } else {
 171                 ASSERT(type & ST_RING_TX);
 172                 (void) snprintf(name, sizeof (name),
 173                     "mac_tx_soft_ring_%d_%p", id, (void *)mac_srs);
 174         }
 175 
 176         bzero(ringp, sizeof (mac_soft_ring_t));
 177         (void) strncpy(ringp->s_ring_name, name, S_RING_NAMELEN + 1);
 178         ringp->s_ring_name[S_RING_NAMELEN] = '\0';
 179         mutex_init(&ringp->s_ring_lock, NULL, MUTEX_DEFAULT, NULL);
 180         ringp->s_ring_notify_cb_info.mcbi_lockp = &ringp->s_ring_lock;
 181 
 182         ringp->s_ring_type = type;
 183         ringp->s_ring_wait = MSEC_TO_TICK(wait);
 184         ringp->s_ring_mcip = mcip;
 185         ringp->s_ring_set = mac_srs;
 186 
 187         /*
 188          * Protect against access from DR callbacks (mac_walk_srs_bind/unbind)
 189          * which can't grab the mac perimeter
 190          */
 191         mutex_enter(&mac_srs->srs_lock);
 192         ADD_SOFTRING_TO_SET(mac_srs, ringp);
 193         mutex_exit(&mac_srs->srs_lock);
 194 
 195         /*
 196          * set the bind CPU to -1 to indicate
 197          * no thread affinity set
 198          */
 199         ringp->s_ring_cpuid = ringp->s_ring_cpuid_save = -1;
 200         ringp->s_ring_worker = thread_create(NULL, 0,
 201             mac_soft_ring_worker, ringp, 0, &p0, TS_RUN, pri);
 202         if (type & ST_RING_TX) {
 203                 ringp->s_ring_drain_func = mac_tx_soft_ring_drain;
 204                 ringp->s_ring_tx_arg1 = x_arg1;
 205                 ringp->s_ring_tx_arg2 = x_arg2;
 206                 ringp->s_ring_tx_max_q_cnt = mac_tx_soft_ring_max_q_cnt;
 207                 ringp->s_ring_tx_hiwat =
 208                     (mac_tx_soft_ring_hiwat > mac_tx_soft_ring_max_q_cnt) ?
 209                     mac_tx_soft_ring_max_q_cnt : mac_tx_soft_ring_hiwat;
 210                 if (mcip->mci_state_flags & MCIS_IS_AGGR_CLIENT) {
 211                         mac_srs_tx_t *tx = &mac_srs->srs_tx;
 212 
 213                         ASSERT(tx->st_soft_rings[
 214                             ((mac_ring_t *)x_arg2)->mr_index] == NULL);
 215                         tx->st_soft_rings[((mac_ring_t *)x_arg2)->mr_index] =
 216                             ringp;
 217                 }
 218         } else {
 219                 ringp->s_ring_drain_func = mac_rx_soft_ring_drain;
 220                 ringp->s_ring_rx_func = rx_func;
 221                 ringp->s_ring_rx_arg1 = x_arg1;
 222                 ringp->s_ring_rx_arg2 = x_arg2;
 223                 if (mac_srs->srs_state & SRS_SOFTRING_QUEUE)
 224                         ringp->s_ring_type |= ST_RING_WORKER_ONLY;
 225         }
 226         if (cpuid != -1)
 227                 (void) mac_soft_ring_bind(ringp, cpuid);
 228 
 229         mac_soft_ring_stat_create(ringp);
 230 
 231         return (ringp);
 232 }
 233 
 234 /*
 235  * mac_soft_ring_free
 236  *
 237  * Free the soft ring once we are done with it.
 238  */
 239 void
 240 mac_soft_ring_free(mac_soft_ring_t *softring)
 241 {
 242         ASSERT((softring->s_ring_state &
 243             (S_RING_CONDEMNED | S_RING_CONDEMNED_DONE | S_RING_PROC)) ==
 244             (S_RING_CONDEMNED | S_RING_CONDEMNED_DONE));
 245         mac_pkt_drop(NULL, NULL, softring->s_ring_first, B_FALSE);
 246         softring->s_ring_tx_arg2 = NULL;
 247         mac_soft_ring_stat_delete(softring);
 248         mac_callback_free(softring->s_ring_notify_cb_list);
 249         kmem_cache_free(mac_soft_ring_cache, softring);
 250 }
 251 
 252 int mac_soft_ring_thread_bind = 1;
 253 
 254 /*
 255  * mac_soft_ring_bind
 256  *
 257  * Bind a soft ring worker thread to supplied CPU.
 258  */
 259 cpu_t *
 260 mac_soft_ring_bind(mac_soft_ring_t *ringp, processorid_t cpuid)
 261 {
 262         cpu_t *cp;
 263         boolean_t clear = B_FALSE;
 264 
 265         ASSERT(MUTEX_HELD(&cpu_lock));
 266 
 267         if (mac_soft_ring_thread_bind == 0) {
 268                 DTRACE_PROBE1(mac__soft__ring__no__cpu__bound,
 269                     mac_soft_ring_t *, ringp);
 270                 return (NULL);
 271         }
 272 
 273         cp = cpu_get(cpuid);
 274         if (cp == NULL || !cpu_is_online(cp))
 275                 return (NULL);
 276 
 277         mutex_enter(&ringp->s_ring_lock);
 278         ringp->s_ring_state |= S_RING_BOUND;
 279         if (ringp->s_ring_cpuid != -1)
 280                 clear = B_TRUE;
 281         ringp->s_ring_cpuid = cpuid;
 282         mutex_exit(&ringp->s_ring_lock);
 283 
 284         if (clear)
 285                 thread_affinity_clear(ringp->s_ring_worker);
 286 
 287         DTRACE_PROBE2(mac__soft__ring__cpu__bound, mac_soft_ring_t *,
 288             ringp, processorid_t, cpuid);
 289 
 290         thread_affinity_set(ringp->s_ring_worker, cpuid);
 291 
 292         return (cp);
 293 }
 294 
 295 /*
 296  * mac_soft_ring_unbind
 297  *
 298  * Un Bind a soft ring worker thread.
 299  */
 300 void
 301 mac_soft_ring_unbind(mac_soft_ring_t *ringp)
 302 {
 303         ASSERT(MUTEX_HELD(&cpu_lock));
 304 
 305         mutex_enter(&ringp->s_ring_lock);
 306         if (!(ringp->s_ring_state & S_RING_BOUND)) {
 307                 ASSERT(ringp->s_ring_cpuid == -1);
 308                 mutex_exit(&ringp->s_ring_lock);
 309                 return;
 310         }
 311 
 312         ringp->s_ring_cpuid = -1;
 313         ringp->s_ring_state &= ~S_RING_BOUND;
 314         thread_affinity_clear(ringp->s_ring_worker);
 315         mutex_exit(&ringp->s_ring_lock);
 316 }
 317 
 318 /*
 319  * PRIVATE FUNCTIONS
 320  */
 321 
 322 static void
 323 mac_soft_ring_fire(void *arg)
 324 {
 325         mac_soft_ring_t *ringp = arg;
 326 
 327         mutex_enter(&ringp->s_ring_lock);
 328         if (ringp->s_ring_tid == NULL) {
 329                 mutex_exit(&ringp->s_ring_lock);
 330                 return;
 331         }
 332 
 333         ringp->s_ring_tid = NULL;
 334 
 335         if (!(ringp->s_ring_state & S_RING_PROC)) {
 336                 cv_signal(&ringp->s_ring_async);
 337         }
 338         mutex_exit(&ringp->s_ring_lock);
 339 }
 340 
 341 /*
 342  * Drain the soft ring pointed to by ringp.
 343  *
 344  *    o s_ring_first: pointer to the queued packet chain.
 345  *
 346  *    o s_ring_rx_func: pointer to to the client's Rx routine.
 347  *
 348  *    o s_ring_rx_{arg1,arg2}: opaque values specific to the client.
 349  */
 350 static void
 351 mac_rx_soft_ring_drain(mac_soft_ring_t *ringp)
 352 {
 353         mblk_t          *mp;
 354         void            *arg1;
 355         mac_resource_handle_t arg2;
 356         timeout_id_t    tid;
 357         mac_direct_rx_t proc;
 358         size_t          sz;
 359         int             cnt;
 360         mac_soft_ring_set_t     *mac_srs = ringp->s_ring_set;
 361 
 362         ringp->s_ring_run = curthread;
 363         ASSERT(mutex_owned(&ringp->s_ring_lock));
 364         ASSERT(!(ringp->s_ring_state & S_RING_PROC));
 365 
 366         if ((tid = ringp->s_ring_tid) != NULL)
 367                 ringp->s_ring_tid = NULL;
 368 
 369         ringp->s_ring_state |= S_RING_PROC;
 370 
 371         proc = ringp->s_ring_rx_func;
 372         arg1 = ringp->s_ring_rx_arg1;
 373         arg2 = ringp->s_ring_rx_arg2;
 374 
 375         while ((ringp->s_ring_first != NULL) &&
 376             !(ringp->s_ring_state & S_RING_PAUSE)) {
 377                 mp = ringp->s_ring_first;
 378                 ringp->s_ring_first = NULL;
 379                 ringp->s_ring_last = NULL;
 380                 cnt = ringp->s_ring_count;
 381                 ringp->s_ring_count = 0;
 382                 sz = ringp->s_ring_size;
 383                 ringp->s_ring_size = 0;
 384                 mutex_exit(&ringp->s_ring_lock);
 385 
 386                 if (tid != NULL) {
 387                         (void) untimeout(tid);
 388                         tid = NULL;
 389                 }
 390 
 391                 (*proc)(arg1, arg2, mp, NULL);
 392 
 393                 /*
 394                  * If we have an SRS performing bandwidth control, then
 395                  * we need to decrement the size and count so the SRS
 396                  * has an accurate measure of the data queued between
 397                  * the SRS and its soft rings. We decrement the
 398                  * counters only when the packet is processed by both
 399                  * the SRS and the soft ring.
 400                  */
 401                 mutex_enter(&mac_srs->srs_lock);
 402                 MAC_UPDATE_SRS_COUNT_LOCKED(mac_srs, cnt);
 403                 MAC_UPDATE_SRS_SIZE_LOCKED(mac_srs, sz);
 404                 mutex_exit(&mac_srs->srs_lock);
 405 
 406                 mutex_enter(&ringp->s_ring_lock);
 407         }
 408         ringp->s_ring_state &= ~S_RING_PROC;
 409         if (ringp->s_ring_state & S_RING_CLIENT_WAIT)
 410                 cv_signal(&ringp->s_ring_client_cv);
 411         ringp->s_ring_run = NULL;
 412 }
 413 
 414 /*
 415  * The soft ring worker routine to process any queued packets. In
 416  * normal case, the worker thread is bound to a CPU. If the soft ring
 417  * handles TCP packets then the worker thread is bound to the same CPU
 418  * as the TCP squeue.
 419  */
 420 static void
 421 mac_soft_ring_worker(mac_soft_ring_t *ringp)
 422 {
 423         kmutex_t *lock = &ringp->s_ring_lock;
 424         kcondvar_t *async = &ringp->s_ring_async;
 425         mac_soft_ring_set_t *srs = ringp->s_ring_set;
 426         callb_cpr_t cprinfo;
 427 
 428         CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "mac_soft_ring");
 429         mutex_enter(lock);
 430 start:
 431         for (;;) {
 432                 while (((ringp->s_ring_first == NULL ||
 433                     (ringp->s_ring_state & (S_RING_BLOCK|S_RING_BLANK))) &&
 434                     !(ringp->s_ring_state & S_RING_PAUSE)) ||
 435                     (ringp->s_ring_state & S_RING_PROC)) {
 436 
 437                         CALLB_CPR_SAFE_BEGIN(&cprinfo);
 438                         cv_wait(async, lock);
 439                         CALLB_CPR_SAFE_END(&cprinfo, lock);
 440                 }
 441 
 442                 /*
 443                  * Either we have work to do, or we have been asked to
 444                  * shutdown temporarily or permanently
 445                  */
 446                 if (ringp->s_ring_state & S_RING_PAUSE)
 447                         goto done;
 448 
 449                 ringp->s_ring_drain_func(ringp);
 450         }
 451 done:
 452         mutex_exit(lock);
 453         mutex_enter(&srs->srs_lock);
 454         mutex_enter(lock);
 455 
 456         ringp->s_ring_state |= S_RING_QUIESCE_DONE;
 457         if (!(ringp->s_ring_state & S_RING_CONDEMNED)) {
 458                 srs->srs_soft_ring_quiesced_count++;
 459                 cv_broadcast(&srs->srs_async);
 460                 mutex_exit(&srs->srs_lock);
 461                 while (!(ringp->s_ring_state &
 462                     (S_RING_RESTART | S_RING_CONDEMNED)))
 463                         cv_wait(&ringp->s_ring_async, &ringp->s_ring_lock);
 464                 mutex_exit(lock);
 465                 mutex_enter(&srs->srs_lock);
 466                 mutex_enter(lock);
 467                 srs->srs_soft_ring_quiesced_count--;
 468                 if (ringp->s_ring_state & S_RING_RESTART) {
 469                         ASSERT(!(ringp->s_ring_state & S_RING_CONDEMNED));
 470                         ringp->s_ring_state &= ~(S_RING_RESTART |
 471                             S_RING_QUIESCE | S_RING_QUIESCE_DONE);
 472                         cv_broadcast(&srs->srs_async);
 473                         mutex_exit(&srs->srs_lock);
 474                         goto start;
 475                 }
 476         }
 477         ASSERT(ringp->s_ring_state & S_RING_CONDEMNED);
 478         ringp->s_ring_state |= S_RING_CONDEMNED_DONE;
 479         CALLB_CPR_EXIT(&cprinfo);
 480         srs->srs_soft_ring_condemned_count++;
 481         cv_broadcast(&srs->srs_async);
 482         mutex_exit(&srs->srs_lock);
 483         thread_exit();
 484 }
 485 
 486 /*
 487  * mac_soft_ring_intr_enable and mac_soft_ring_intr_disable
 488  *
 489  * these functions are called to toggle the sending of packets to the
 490  * client. They are called by the client. the client gets the name
 491  * of these routine and corresponding cookie (pointing to softring)
 492  * during capability negotiation at setup time.
 493  *
 494  * Enabling is allow the processing thread to send packets to the
 495  * client while disabling does the opposite.
 496  */
 497 int
 498 mac_soft_ring_intr_enable(void *arg)
 499 {
 500         mac_soft_ring_t *ringp = (mac_soft_ring_t *)arg;
 501         mutex_enter(&ringp->s_ring_lock);
 502         ringp->s_ring_state &= ~S_RING_BLANK;
 503         if (ringp->s_ring_first != NULL)
 504                 mac_soft_ring_worker_wakeup(ringp);
 505         mutex_exit(&ringp->s_ring_lock);
 506         return (0);
 507 }
 508 
 509 boolean_t
 510 mac_soft_ring_intr_disable(void *arg)
 511 {
 512         mac_soft_ring_t *ringp = (mac_soft_ring_t *)arg;
 513         boolean_t sring_blanked = B_FALSE;
 514         /*
 515          * Stop worker thread from sending packets above.
 516          * Squeue will poll soft ring when it needs packets.
 517          */
 518         mutex_enter(&ringp->s_ring_lock);
 519         if (!(ringp->s_ring_state & S_RING_PROC)) {
 520                 ringp->s_ring_state |= S_RING_BLANK;
 521                 sring_blanked = B_TRUE;
 522         }
 523         mutex_exit(&ringp->s_ring_lock);
 524         return (sring_blanked);
 525 }
 526 
 527 /*
 528  * mac_soft_ring_poll
 529  *
 530  * This routine is called by the client to poll for packets from
 531  * the soft ring. The function name and cookie corresponding to
 532  * the soft ring is exchanged during capability negotiation during
 533  * setup.
 534  */
 535 mblk_t *
 536 mac_soft_ring_poll(mac_soft_ring_t *ringp, size_t bytes_to_pickup)
 537 {
 538         mblk_t  *head, *tail;
 539         mblk_t  *mp;
 540         size_t  sz = 0;
 541         int     cnt = 0;
 542         mac_soft_ring_set_t     *mac_srs = ringp->s_ring_set;
 543 
 544         ASSERT(mac_srs != NULL);
 545 
 546         mutex_enter(&ringp->s_ring_lock);
 547         head = tail = mp = ringp->s_ring_first;
 548         if (head == NULL) {
 549                 mutex_exit(&ringp->s_ring_lock);
 550                 return (NULL);
 551         }
 552 
 553         if (ringp->s_ring_size <= bytes_to_pickup) {
 554                 head = ringp->s_ring_first;
 555                 ringp->s_ring_first = NULL;
 556                 ringp->s_ring_last = NULL;
 557                 cnt = ringp->s_ring_count;
 558                 ringp->s_ring_count = 0;
 559                 sz = ringp->s_ring_size;
 560                 ringp->s_ring_size = 0;
 561         } else {
 562                 while (mp && sz <= bytes_to_pickup) {
 563                         sz += msgdsize(mp);
 564                         cnt++;
 565                         tail = mp;
 566                         mp = mp->b_next;
 567                 }
 568                 ringp->s_ring_count -= cnt;
 569                 ringp->s_ring_size -= sz;
 570                 tail->b_next = NULL;
 571                 if (mp == NULL) {
 572                         ringp->s_ring_first = NULL;
 573                         ringp->s_ring_last = NULL;
 574                         ASSERT(ringp->s_ring_count == 0);
 575                 } else {
 576                         ringp->s_ring_first = mp;
 577                 }
 578         }
 579 
 580         mutex_exit(&ringp->s_ring_lock);
 581         /*
 582          * Update the shared count and size counters so
 583          * that SRS has a accurate idea of queued packets.
 584          */
 585         mutex_enter(&mac_srs->srs_lock);
 586         MAC_UPDATE_SRS_COUNT_LOCKED(mac_srs, cnt);
 587         MAC_UPDATE_SRS_SIZE_LOCKED(mac_srs, sz);
 588         mutex_exit(&mac_srs->srs_lock);
 589         return (head);
 590 }
 591 
 592 /*
 593  * mac_soft_ring_dls_bypass
 594  *
 595  * Enable direct client (IP) callback function from the softrings.
 596  * Callers need to make sure they don't need any DLS layer processing
 597  */
 598 void
 599 mac_soft_ring_dls_bypass(void *arg, mac_direct_rx_t rx_func, void *rx_arg1)
 600 {
 601         mac_soft_ring_t         *softring = arg;
 602         mac_soft_ring_set_t     *srs;
 603 
 604         VERIFY3P(rx_func, !=, NULL);
 605 
 606         mutex_enter(&softring->s_ring_lock);
 607         softring->s_ring_rx_func = rx_func;
 608         softring->s_ring_rx_arg1 = rx_arg1;
 609         mutex_exit(&softring->s_ring_lock);
 610 
 611         srs = softring->s_ring_set;
 612         mutex_enter(&srs->srs_lock);
 613         srs->srs_type |= SRST_DLS_BYPASS;
 614         mutex_exit(&srs->srs_lock);
 615 }
 616 
 617 /*
 618  * mac_soft_ring_signal
 619  *
 620  * Typically used to set the soft ring state to QUIESCE, CONDEMNED, or
 621  * RESTART.
 622  *
 623  * In the Rx side, the quiescing is done bottom up. After the Rx upcalls
 624  * from the driver are done, then the Rx SRS is quiesced and only then can
 625  * we signal the soft rings. Thus this function can't be called arbitrarily
 626  * without satisfying the prerequisites. On the Tx side, the threads from
 627  * top need to quiesced, then the Tx SRS and only then can we signal the
 628  * Tx soft rings.
 629  */
 630 void
 631 mac_soft_ring_signal(mac_soft_ring_t *softring, uint_t sr_flag)
 632 {
 633         mutex_enter(&softring->s_ring_lock);
 634         softring->s_ring_state |= sr_flag;
 635         cv_signal(&softring->s_ring_async);
 636         mutex_exit(&softring->s_ring_lock);
 637 }
 638 
 639 /*
 640  * mac_tx_soft_ring_drain
 641  *
 642  * The transmit side drain routine in case the soft ring was being
 643  * used to transmit packets.
 644  */
 645 static void
 646 mac_tx_soft_ring_drain(mac_soft_ring_t *ringp)
 647 {
 648         mblk_t                  *mp;
 649         void                    *arg1;
 650         void                    *arg2;
 651         mblk_t                  *tail;
 652         uint_t                  saved_pkt_count, saved_size;
 653         mac_tx_stats_t          stats;
 654         mac_soft_ring_set_t     *mac_srs = ringp->s_ring_set;
 655 
 656         saved_pkt_count = saved_size = 0;
 657         ringp->s_ring_run = curthread;
 658         ASSERT(mutex_owned(&ringp->s_ring_lock));
 659         ASSERT(!(ringp->s_ring_state & S_RING_PROC));
 660 
 661         ringp->s_ring_state |= S_RING_PROC;
 662         arg1 = ringp->s_ring_tx_arg1;
 663         arg2 = ringp->s_ring_tx_arg2;
 664 
 665         while (ringp->s_ring_first != NULL) {
 666                 mp = ringp->s_ring_first;
 667                 tail = ringp->s_ring_last;
 668                 saved_pkt_count = ringp->s_ring_count;
 669                 saved_size = ringp->s_ring_size;
 670                 ringp->s_ring_first = NULL;
 671                 ringp->s_ring_last = NULL;
 672                 ringp->s_ring_count = 0;
 673                 ringp->s_ring_size = 0;
 674                 mutex_exit(&ringp->s_ring_lock);
 675 
 676                 mp = mac_tx_send(arg1, arg2, mp, &stats);
 677 
 678                 mutex_enter(&ringp->s_ring_lock);
 679                 if (mp != NULL) {
 680                         /* Device out of tx desc, set block */
 681                         tail->b_next = ringp->s_ring_first;
 682                         ringp->s_ring_first = mp;
 683                         ringp->s_ring_count +=
 684                             (saved_pkt_count - stats.mts_opackets);
 685                         ringp->s_ring_size += (saved_size - stats.mts_obytes);
 686                         if (ringp->s_ring_last == NULL)
 687                                 ringp->s_ring_last = tail;
 688 
 689                         if (ringp->s_ring_tx_woken_up) {
 690                                 ringp->s_ring_tx_woken_up = B_FALSE;
 691                         } else {
 692                                 ringp->s_ring_state |= S_RING_BLOCK;
 693                                 ringp->s_st_stat.mts_blockcnt++;
 694                         }
 695 
 696                         ringp->s_ring_state &= ~S_RING_PROC;
 697                         ringp->s_ring_run = NULL;
 698                         return;
 699                 } else {
 700                         ringp->s_ring_tx_woken_up = B_FALSE;
 701                         SRS_TX_STATS_UPDATE(mac_srs, &stats);
 702                         SOFTRING_TX_STATS_UPDATE(ringp, &stats);
 703                 }
 704         }
 705 
 706         if (ringp->s_ring_count == 0 && ringp->s_ring_state &
 707             (S_RING_TX_HIWAT | S_RING_WAKEUP_CLIENT | S_RING_ENQUEUED)) {
 708                 mac_client_impl_t *mcip =  ringp->s_ring_mcip;
 709                 boolean_t wakeup_required = B_FALSE;
 710 
 711                 if (ringp->s_ring_state &
 712                     (S_RING_TX_HIWAT|S_RING_WAKEUP_CLIENT)) {
 713                         wakeup_required = B_TRUE;
 714                 }
 715                 ringp->s_ring_state &=
 716                     ~(S_RING_TX_HIWAT | S_RING_WAKEUP_CLIENT | S_RING_ENQUEUED);
 717                 mutex_exit(&ringp->s_ring_lock);
 718                 if (wakeup_required) {
 719                         mac_tx_invoke_callbacks(mcip, (mac_tx_cookie_t)ringp);
 720                         /*
 721                          * If the client is not the primary MAC client, then we
 722                          * need to send the notification to the clients upper
 723                          * MAC, i.e. mci_upper_mip.
 724                          */
 725                         mac_tx_notify(mcip->mci_upper_mip != NULL ?
 726                             mcip->mci_upper_mip : mcip->mci_mip);
 727                 }
 728                 mutex_enter(&ringp->s_ring_lock);
 729         }
 730         ringp->s_ring_state &= ~S_RING_PROC;
 731         ringp->s_ring_run = NULL;
 732 }