1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
  23  * Use is subject to license terms.
  24  * Copyright 2017 Joyent, Inc.
  25  */
  26 
  27 /*
  28  * General Soft rings - Simulating Rx rings in S/W.
  29  *
  30  * Soft ring is a data abstraction containing a queue and a worker
  31  * thread and represents a hardware Rx ring in software. Each soft
  32  * ring set can have a collection of soft rings for separating
  33  * L3/L4 specific traffic (IPv4 from IPv6 or TCP from UDP) or for
  34  * allowing a higher degree of parallelism by sending traffic to
  35  * one of the soft rings for a SRS (using a hash on src IP or port).
  36  * Each soft ring worker thread can be bound to a different CPU
  37  * allowing the processing for each soft ring to happen in parallel
  38  * and independent from each other.
  39  *
  40  * Protocol soft rings:
  41  *
  42  * Each SRS has at an minimum 3 softrings. One each for IPv4 TCP,
  43  * IPv4 UDP and rest (OTH - for IPv6 and everything else). The
  44  * SRS does dynamic polling and enforces link level bandwidth but
  45  * it does so for all traffic (IPv4 and IPv6 and all protocols) on
  46  * that link. However, each protocol layer wants a different
  47  * behaviour. For instance IPv4 TCP has per CPU squeues which
  48  * enforce their own polling and flow control so IPv4 TCP traffic
  49  * needs to go to a separate soft ring which can be polled by the
  50  * TCP squeue. It also allows TCP squeue to push back flow control
  51  * all the way to NIC hardware (if it puts its corresponding soft
  52  * ring in the poll mode and soft ring queue builds up, the
  53  * shared srs_poll_pkt_cnt goes up and SRS automatically stops
  54  * more packets from entering the system).
  55  *
  56  * Similarly, the UDP benefits from a DLS bypass and packet chaining
  57  * so sending it to a separate soft ring is desired. All the rest of
  58  * the traffic (including IPv6 is sent to OTH softring). The IPv6
  59  * traffic current goes through OTH softring and via DLS because
  60  * it need more processing to be done. Irrespective of the sap
  61  * (IPv4 or IPv6) or the transport, the dynamic polling, B/W enforcement,
  62  * cpu assignment, fanout, etc apply to all traffic since they
  63  * are implement by the SRS which is agnostic to sap or transport.
  64  *
  65  * Fanout soft rings:
  66  *
  67  * On a multithreaded system, we can assign more CPU and multi thread
  68  * the stack by creating a soft ring per CPU and spreading traffic
  69  * based on a hash computed on src IP etc. Since we still need to
  70  * keep the protocol separation, we create a set of 3 soft ring per
  71  * CPU (specified by cpu list or degree of fanout).
  72  *
  73  * NOTE: See the block level comment on top of mac_sched.c
  74  */
  75 
  76 #include <sys/types.h>
  77 #include <sys/callb.h>
  78 #include <sys/sdt.h>
  79 #include <sys/strsubr.h>
  80 #include <sys/strsun.h>
  81 #include <sys/vlan.h>
  82 #include <inet/ipsec_impl.h>
  83 #include <inet/ip_impl.h>
  84 #include <inet/sadb.h>
  85 #include <inet/ipsecesp.h>
  86 #include <inet/ipsecah.h>
  87 
  88 #include <sys/mac_impl.h>
  89 #include <sys/mac_client_impl.h>
  90 #include <sys/mac_soft_ring.h>
  91 #include <sys/mac_flow_impl.h>
  92 #include <sys/mac_stat.h>
  93 
  94 static void mac_rx_soft_ring_drain(mac_soft_ring_t *);
  95 static void mac_soft_ring_fire(void *);
  96 static void mac_soft_ring_worker(mac_soft_ring_t *);
  97 static void mac_tx_soft_ring_drain(mac_soft_ring_t *);
  98 
  99 uint32_t mac_tx_soft_ring_max_q_cnt = 100000;
 100 uint32_t mac_tx_soft_ring_hiwat = 1000;
 101 
 102 extern kmem_cache_t *mac_soft_ring_cache;
 103 
 104 #define ADD_SOFTRING_TO_SET(mac_srs, softring) {                        \
 105         if (mac_srs->srs_soft_ring_head == NULL) {                   \
 106                 mac_srs->srs_soft_ring_head = softring;                      \
 107                 mac_srs->srs_soft_ring_tail = softring;                      \
 108         } else {                                                        \
 109                 /* ADD to the list */                                   \
 110                 softring->s_ring_prev =                                      \
 111                         mac_srs->srs_soft_ring_tail;                 \
 112                 mac_srs->srs_soft_ring_tail->s_ring_next = softring;      \
 113                 mac_srs->srs_soft_ring_tail = softring;                      \
 114         }                                                               \
 115         mac_srs->srs_soft_ring_count++;                                      \
 116 }
 117 
 118 /*
 119  * mac_soft_ring_worker_wakeup
 120  *
 121  * Wake up the soft ring worker thread to process the queue as long
 122  * as no one else is processing it and upper layer (client) is still
 123  * ready to receive packets.
 124  */
 125 void
 126 mac_soft_ring_worker_wakeup(mac_soft_ring_t *ringp)
 127 {
 128         ASSERT(MUTEX_HELD(&ringp->s_ring_lock));
 129         if (!(ringp->s_ring_state & S_RING_PROC) &&
 130             !(ringp->s_ring_state & S_RING_BLANK) &&
 131             (ringp->s_ring_tid == NULL)) {
 132                 if (ringp->s_ring_wait != 0) {
 133                         ringp->s_ring_tid =
 134                             timeout(mac_soft_ring_fire, ringp,
 135                             ringp->s_ring_wait);
 136                 } else {
 137                         /* Schedule the worker thread. */
 138                         cv_signal(&ringp->s_ring_async);
 139                 }
 140         }
 141 }
 142 
 143 /*
 144  * mac_soft_ring_create
 145  *
 146  * Create a soft ring, do the necessary setup and bind the worker
 147  * thread to the assigned CPU.
 148  */
 149 mac_soft_ring_t *
 150 mac_soft_ring_create(int id, clock_t wait, uint16_t type,
 151     pri_t pri, mac_client_impl_t *mcip, mac_soft_ring_set_t *mac_srs,
 152     processorid_t cpuid, mac_direct_rx_t rx_func, void *x_arg1,
 153     mac_resource_handle_t x_arg2)
 154 {
 155         mac_soft_ring_t         *ringp;
 156         char                    name[S_RING_NAMELEN];
 157 
 158         bzero(name, 64);
 159         ringp = kmem_cache_alloc(mac_soft_ring_cache, KM_SLEEP);
 160 
 161         if (type & ST_RING_TCP) {
 162                 (void) snprintf(name, sizeof (name),
 163                     "mac_tcp_soft_ring_%d_%p", id, (void *)mac_srs);
 164         } else if (type & ST_RING_UDP) {
 165                 (void) snprintf(name, sizeof (name),
 166                     "mac_udp_soft_ring_%d_%p", id, (void *)mac_srs);
 167         } else if (type & ST_RING_OTH) {
 168                 (void) snprintf(name, sizeof (name),
 169                     "mac_oth_soft_ring_%d_%p", id, (void *)mac_srs);
 170         } else {
 171                 ASSERT(type & ST_RING_TX);
 172                 (void) snprintf(name, sizeof (name),
 173                     "mac_tx_soft_ring_%d_%p", id, (void *)mac_srs);
 174         }
 175 
 176         bzero(ringp, sizeof (mac_soft_ring_t));
 177         (void) strncpy(ringp->s_ring_name, name, S_RING_NAMELEN + 1);
 178         ringp->s_ring_name[S_RING_NAMELEN] = '\0';
 179         mutex_init(&ringp->s_ring_lock, NULL, MUTEX_DEFAULT, NULL);
 180         ringp->s_ring_notify_cb_info.mcbi_lockp = &ringp->s_ring_lock;
 181 
 182         ringp->s_ring_type = type;
 183         ringp->s_ring_wait = MSEC_TO_TICK(wait);
 184         ringp->s_ring_mcip = mcip;
 185         ringp->s_ring_set = mac_srs;
 186 
 187         /*
 188          * Protect against access from DR callbacks (mac_walk_srs_bind/unbind)
 189          * which can't grab the mac perimeter
 190          */
 191         mutex_enter(&mac_srs->srs_lock);
 192         ADD_SOFTRING_TO_SET(mac_srs, ringp);
 193         mutex_exit(&mac_srs->srs_lock);
 194 
 195         /*
 196          * set the bind CPU to -1 to indicate
 197          * no thread affinity set
 198          */
 199         ringp->s_ring_cpuid = ringp->s_ring_cpuid_save = -1;
 200         ringp->s_ring_worker = thread_create(NULL, 0,
 201             mac_soft_ring_worker, ringp, 0, &p0, TS_RUN, pri);
 202         if (type & ST_RING_TX) {
 203                 ringp->s_ring_drain_func = mac_tx_soft_ring_drain;
 204                 ringp->s_ring_tx_arg1 = x_arg1;
 205                 ringp->s_ring_tx_arg2 = x_arg2;
 206                 ringp->s_ring_tx_max_q_cnt = mac_tx_soft_ring_max_q_cnt;
 207                 ringp->s_ring_tx_hiwat =
 208                     (mac_tx_soft_ring_hiwat > mac_tx_soft_ring_max_q_cnt) ?
 209                     mac_tx_soft_ring_max_q_cnt : mac_tx_soft_ring_hiwat;
 210                 if (mcip->mci_state_flags & MCIS_IS_AGGR) {
 211                         mac_srs_tx_t *tx = &mac_srs->srs_tx;
 212 
 213                         ASSERT(tx->st_soft_rings[
 214                             ((mac_ring_t *)x_arg2)->mr_index] == NULL);
 215                         tx->st_soft_rings[((mac_ring_t *)x_arg2)->mr_index] =
 216                             ringp;
 217                 }
 218         } else {
 219                 ringp->s_ring_drain_func = mac_rx_soft_ring_drain;
 220                 ringp->s_ring_rx_func = rx_func;
 221                 ringp->s_ring_rx_arg1 = x_arg1;
 222                 ringp->s_ring_rx_arg2 = x_arg2;
 223                 if (mac_srs->srs_state & SRS_SOFTRING_QUEUE)
 224                         ringp->s_ring_type |= ST_RING_WORKER_ONLY;
 225         }
 226         if (cpuid != -1)
 227                 (void) mac_soft_ring_bind(ringp, cpuid);
 228 
 229         mac_soft_ring_stat_create(ringp);
 230 
 231         return (ringp);
 232 }
 233 
 234 /*
 235  * mac_soft_ring_free
 236  *
 237  * Free the soft ring once we are done with it.
 238  */
 239 void
 240 mac_soft_ring_free(mac_soft_ring_t *softring)
 241 {
 242         ASSERT((softring->s_ring_state &
 243             (S_RING_CONDEMNED | S_RING_CONDEMNED_DONE | S_RING_PROC)) ==
 244             (S_RING_CONDEMNED | S_RING_CONDEMNED_DONE));
 245         mac_pkt_drop(NULL, NULL, softring->s_ring_first, B_FALSE);
 246         softring->s_ring_tx_arg2 = NULL;
 247         mac_soft_ring_stat_delete(softring);
 248         mac_callback_free(softring->s_ring_notify_cb_list);
 249         kmem_cache_free(mac_soft_ring_cache, softring);
 250 }
 251 
 252 int mac_soft_ring_thread_bind = 1;
 253 
 254 /*
 255  * mac_soft_ring_bind
 256  *
 257  * Bind a soft ring worker thread to supplied CPU.
 258  */
 259 cpu_t *
 260 mac_soft_ring_bind(mac_soft_ring_t *ringp, processorid_t cpuid)
 261 {
 262         cpu_t *cp;
 263         boolean_t clear = B_FALSE;
 264 
 265         ASSERT(MUTEX_HELD(&cpu_lock));
 266 
 267         if (mac_soft_ring_thread_bind == 0) {
 268                 DTRACE_PROBE1(mac__soft__ring__no__cpu__bound,
 269                     mac_soft_ring_t *, ringp);
 270                 return (NULL);
 271         }
 272 
 273         cp = cpu_get(cpuid);
 274         if (cp == NULL || !cpu_is_online(cp))
 275                 return (NULL);
 276 
 277         mutex_enter(&ringp->s_ring_lock);
 278         ringp->s_ring_state |= S_RING_BOUND;
 279         if (ringp->s_ring_cpuid != -1)
 280                 clear = B_TRUE;
 281         ringp->s_ring_cpuid = cpuid;
 282         mutex_exit(&ringp->s_ring_lock);
 283 
 284         if (clear)
 285                 thread_affinity_clear(ringp->s_ring_worker);
 286 
 287         DTRACE_PROBE2(mac__soft__ring__cpu__bound, mac_soft_ring_t *,
 288             ringp, processorid_t, cpuid);
 289 
 290         thread_affinity_set(ringp->s_ring_worker, cpuid);
 291 
 292         return (cp);
 293 }
 294 
 295 /*
 296  * mac_soft_ring_unbind
 297  *
 298  * Un Bind a soft ring worker thread.
 299  */
 300 void
 301 mac_soft_ring_unbind(mac_soft_ring_t *ringp)
 302 {
 303         ASSERT(MUTEX_HELD(&cpu_lock));
 304 
 305         mutex_enter(&ringp->s_ring_lock);
 306         if (!(ringp->s_ring_state & S_RING_BOUND)) {
 307                 ASSERT(ringp->s_ring_cpuid == -1);
 308                 mutex_exit(&ringp->s_ring_lock);
 309                 return;
 310         }
 311 
 312         ringp->s_ring_cpuid = -1;
 313         ringp->s_ring_state &= ~S_RING_BOUND;
 314         thread_affinity_clear(ringp->s_ring_worker);
 315         mutex_exit(&ringp->s_ring_lock);
 316 }
 317 
 318 /*
 319  * PRIVATE FUNCTIONS
 320  */
 321 
 322 static void
 323 mac_soft_ring_fire(void *arg)
 324 {
 325         mac_soft_ring_t *ringp = arg;
 326 
 327         mutex_enter(&ringp->s_ring_lock);
 328         if (ringp->s_ring_tid == NULL) {
 329                 mutex_exit(&ringp->s_ring_lock);
 330                 return;
 331         }
 332 
 333         ringp->s_ring_tid = NULL;
 334 
 335         if (!(ringp->s_ring_state & S_RING_PROC)) {
 336                 cv_signal(&ringp->s_ring_async);
 337         }
 338         mutex_exit(&ringp->s_ring_lock);
 339 }
 340 
 341 /*
 342  * mac_rx_soft_ring_drain
 343  *
 344  * Called when worker thread model (ST_RING_WORKER_ONLY) of processing
 345  * incoming packets is used. s_ring_first contain the queued packets.
 346  * s_ring_rx_func contains the upper level (client) routine where the
 347  * packets are destined and s_ring_rx_arg1/s_ring_rx_arg2 are the
 348  * cookie meant for the client.
 349  */
 350 /* ARGSUSED */
 351 static void
 352 mac_rx_soft_ring_drain(mac_soft_ring_t *ringp)
 353 {
 354         mblk_t          *mp;
 355         void            *arg1;
 356         mac_resource_handle_t arg2;
 357         timeout_id_t    tid;
 358         mac_direct_rx_t proc;
 359         size_t          sz;
 360         int             cnt;
 361         mac_soft_ring_set_t     *mac_srs = ringp->s_ring_set;
 362 
 363         ringp->s_ring_run = curthread;
 364         ASSERT(mutex_owned(&ringp->s_ring_lock));
 365         ASSERT(!(ringp->s_ring_state & S_RING_PROC));
 366 
 367         if ((tid = ringp->s_ring_tid) != NULL)
 368                 ringp->s_ring_tid = NULL;
 369 
 370         ringp->s_ring_state |= S_RING_PROC;
 371 
 372         proc = ringp->s_ring_rx_func;
 373         arg1 = ringp->s_ring_rx_arg1;
 374         arg2 = ringp->s_ring_rx_arg2;
 375 
 376         while ((ringp->s_ring_first != NULL) &&
 377             !(ringp->s_ring_state & S_RING_PAUSE)) {
 378                 mp = ringp->s_ring_first;
 379                 ringp->s_ring_first = NULL;
 380                 ringp->s_ring_last = NULL;
 381                 cnt = ringp->s_ring_count;
 382                 ringp->s_ring_count = 0;
 383                 sz = ringp->s_ring_size;
 384                 ringp->s_ring_size = 0;
 385                 mutex_exit(&ringp->s_ring_lock);
 386 
 387                 if (tid != NULL) {
 388                         (void) untimeout(tid);
 389                         tid = NULL;
 390                 }
 391 
 392                 (*proc)(arg1, arg2, mp, NULL);
 393 
 394                 /*
 395                  * If we have a soft ring set which is doing
 396                  * bandwidth control, we need to decrement its
 397                  * srs_size so it can have a accurate idea of
 398                  * what is the real data queued between SRS and
 399                  * its soft rings. We decrement the size for a
 400                  * packet only when it gets processed by both
 401                  * SRS and the soft ring.
 402                  */
 403                 mutex_enter(&mac_srs->srs_lock);
 404                 MAC_UPDATE_SRS_COUNT_LOCKED(mac_srs, cnt);
 405                 MAC_UPDATE_SRS_SIZE_LOCKED(mac_srs, sz);
 406                 mutex_exit(&mac_srs->srs_lock);
 407 
 408                 mutex_enter(&ringp->s_ring_lock);
 409         }
 410         ringp->s_ring_state &= ~S_RING_PROC;
 411         if (ringp->s_ring_state & S_RING_CLIENT_WAIT)
 412                 cv_signal(&ringp->s_ring_client_cv);
 413         ringp->s_ring_run = NULL;
 414 }
 415 
 416 /*
 417  * mac_soft_ring_worker
 418  *
 419  * The soft ring worker routine to process any queued packets. In
 420  * normal case, the worker thread is bound to a CPU. It the soft
 421  * ring is dealing with TCP packets, then the worker thread will
 422  * be bound to the same CPU as the TCP squeue.
 423  */
 424 static void
 425 mac_soft_ring_worker(mac_soft_ring_t *ringp)
 426 {
 427         kmutex_t *lock = &ringp->s_ring_lock;
 428         kcondvar_t *async = &ringp->s_ring_async;
 429         mac_soft_ring_set_t *srs = ringp->s_ring_set;
 430         callb_cpr_t cprinfo;
 431 
 432         CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "mac_soft_ring");
 433         mutex_enter(lock);
 434 start:
 435         for (;;) {
 436                 while (((ringp->s_ring_first == NULL ||
 437                     (ringp->s_ring_state & (S_RING_BLOCK|S_RING_BLANK))) &&
 438                     !(ringp->s_ring_state & S_RING_PAUSE)) ||
 439                     (ringp->s_ring_state & S_RING_PROC)) {
 440 
 441                         CALLB_CPR_SAFE_BEGIN(&cprinfo);
 442                         cv_wait(async, lock);
 443                         CALLB_CPR_SAFE_END(&cprinfo, lock);
 444                 }
 445 
 446                 /*
 447                  * Either we have work to do, or we have been asked to
 448                  * shutdown temporarily or permanently
 449                  */
 450                 if (ringp->s_ring_state & S_RING_PAUSE)
 451                         goto done;
 452 
 453                 ringp->s_ring_drain_func(ringp);
 454         }
 455 done:
 456         mutex_exit(lock);
 457         mutex_enter(&srs->srs_lock);
 458         mutex_enter(lock);
 459 
 460         ringp->s_ring_state |= S_RING_QUIESCE_DONE;
 461         if (!(ringp->s_ring_state & S_RING_CONDEMNED)) {
 462                 srs->srs_soft_ring_quiesced_count++;
 463                 cv_broadcast(&srs->srs_async);
 464                 mutex_exit(&srs->srs_lock);
 465                 while (!(ringp->s_ring_state &
 466                     (S_RING_RESTART | S_RING_CONDEMNED)))
 467                         cv_wait(&ringp->s_ring_async, &ringp->s_ring_lock);
 468                 mutex_exit(lock);
 469                 mutex_enter(&srs->srs_lock);
 470                 mutex_enter(lock);
 471                 srs->srs_soft_ring_quiesced_count--;
 472                 if (ringp->s_ring_state & S_RING_RESTART) {
 473                         ASSERT(!(ringp->s_ring_state & S_RING_CONDEMNED));
 474                         ringp->s_ring_state &= ~(S_RING_RESTART |
 475                             S_RING_QUIESCE | S_RING_QUIESCE_DONE);
 476                         cv_broadcast(&srs->srs_async);
 477                         mutex_exit(&srs->srs_lock);
 478                         goto start;
 479                 }
 480         }
 481         ASSERT(ringp->s_ring_state & S_RING_CONDEMNED);
 482         ringp->s_ring_state |= S_RING_CONDEMNED_DONE;
 483         CALLB_CPR_EXIT(&cprinfo);
 484         srs->srs_soft_ring_condemned_count++;
 485         cv_broadcast(&srs->srs_async);
 486         mutex_exit(&srs->srs_lock);
 487         thread_exit();
 488 }
 489 
 490 /*
 491  * mac_soft_ring_intr_enable and mac_soft_ring_intr_disable
 492  *
 493  * these functions are called to toggle the sending of packets to the
 494  * client. They are called by the client. the client gets the name
 495  * of these routine and corresponding cookie (pointing to softring)
 496  * during capability negotiation at setup time.
 497  *
 498  * Enabling is allow the processing thread to send packets to the
 499  * client while disabling does the opposite.
 500  */
 501 int
 502 mac_soft_ring_intr_enable(void *arg)
 503 {
 504         mac_soft_ring_t *ringp = (mac_soft_ring_t *)arg;
 505         mutex_enter(&ringp->s_ring_lock);
 506         ringp->s_ring_state &= ~S_RING_BLANK;
 507         if (ringp->s_ring_first != NULL)
 508                 mac_soft_ring_worker_wakeup(ringp);
 509         mutex_exit(&ringp->s_ring_lock);
 510         return (0);
 511 }
 512 
 513 boolean_t
 514 mac_soft_ring_intr_disable(void *arg)
 515 {
 516         mac_soft_ring_t *ringp = (mac_soft_ring_t *)arg;
 517         boolean_t sring_blanked = B_FALSE;
 518         /*
 519          * Stop worker thread from sending packets above.
 520          * Squeue will poll soft ring when it needs packets.
 521          */
 522         mutex_enter(&ringp->s_ring_lock);
 523         if (!(ringp->s_ring_state & S_RING_PROC)) {
 524                 ringp->s_ring_state |= S_RING_BLANK;
 525                 sring_blanked = B_TRUE;
 526         }
 527         mutex_exit(&ringp->s_ring_lock);
 528         return (sring_blanked);
 529 }
 530 
 531 /*
 532  * mac_soft_ring_poll
 533  *
 534  * This routine is called by the client to poll for packets from
 535  * the soft ring. The function name and cookie corresponding to
 536  * the soft ring is exchanged during capability negotiation during
 537  * setup.
 538  */
 539 mblk_t *
 540 mac_soft_ring_poll(mac_soft_ring_t *ringp, size_t bytes_to_pickup)
 541 {
 542         mblk_t  *head, *tail;
 543         mblk_t  *mp;
 544         size_t  sz = 0;
 545         int     cnt = 0;
 546         mac_soft_ring_set_t     *mac_srs = ringp->s_ring_set;
 547 
 548         ASSERT(mac_srs != NULL);
 549 
 550         mutex_enter(&ringp->s_ring_lock);
 551         head = tail = mp = ringp->s_ring_first;
 552         if (head == NULL) {
 553                 mutex_exit(&ringp->s_ring_lock);
 554                 return (NULL);
 555         }
 556 
 557         if (ringp->s_ring_size <= bytes_to_pickup) {
 558                 head = ringp->s_ring_first;
 559                 ringp->s_ring_first = NULL;
 560                 ringp->s_ring_last = NULL;
 561                 cnt = ringp->s_ring_count;
 562                 ringp->s_ring_count = 0;
 563                 sz = ringp->s_ring_size;
 564                 ringp->s_ring_size = 0;
 565         } else {
 566                 while (mp && sz <= bytes_to_pickup) {
 567                         sz += msgdsize(mp);
 568                         cnt++;
 569                         tail = mp;
 570                         mp = mp->b_next;
 571                 }
 572                 ringp->s_ring_count -= cnt;
 573                 ringp->s_ring_size -= sz;
 574                 tail->b_next = NULL;
 575                 if (mp == NULL) {
 576                         ringp->s_ring_first = NULL;
 577                         ringp->s_ring_last = NULL;
 578                         ASSERT(ringp->s_ring_count == 0);
 579                 } else {
 580                         ringp->s_ring_first = mp;
 581                 }
 582         }
 583 
 584         mutex_exit(&ringp->s_ring_lock);
 585         /*
 586          * Update the shared count and size counters so
 587          * that SRS has a accurate idea of queued packets.
 588          */
 589         mutex_enter(&mac_srs->srs_lock);
 590         MAC_UPDATE_SRS_COUNT_LOCKED(mac_srs, cnt);
 591         MAC_UPDATE_SRS_SIZE_LOCKED(mac_srs, sz);
 592         mutex_exit(&mac_srs->srs_lock);
 593         return (head);
 594 }
 595 
 596 /*
 597  * mac_soft_ring_dls_bypass
 598  *
 599  * Enable direct client (IP) callback function from the softrings.
 600  * Callers need to make sure they don't need any DLS layer processing
 601  */
 602 void
 603 mac_soft_ring_dls_bypass(void *arg, mac_direct_rx_t rx_func, void *rx_arg1)
 604 {
 605         mac_soft_ring_t         *softring = arg;
 606         mac_soft_ring_set_t     *srs;
 607 
 608         ASSERT(rx_func != NULL);
 609 
 610         mutex_enter(&softring->s_ring_lock);
 611         softring->s_ring_rx_func = rx_func;
 612         softring->s_ring_rx_arg1 = rx_arg1;
 613         mutex_exit(&softring->s_ring_lock);
 614 
 615         srs = softring->s_ring_set;
 616         mutex_enter(&srs->srs_lock);
 617         srs->srs_type |= SRST_DLS_BYPASS;
 618         mutex_exit(&srs->srs_lock);
 619 }
 620 
 621 /*
 622  * mac_soft_ring_signal
 623  *
 624  * Typically used to set the soft ring state to QUIESCE, CONDEMNED, or
 625  * RESTART.
 626  *
 627  * In the Rx side, the quiescing is done bottom up. After the Rx upcalls
 628  * from the driver are done, then the Rx SRS is quiesced and only then can
 629  * we signal the soft rings. Thus this function can't be called arbitrarily
 630  * without satisfying the prerequisites. On the Tx side, the threads from
 631  * top need to quiesced, then the Tx SRS and only then can we signal the
 632  * Tx soft rings.
 633  */
 634 void
 635 mac_soft_ring_signal(mac_soft_ring_t *softring, uint_t sr_flag)
 636 {
 637         mutex_enter(&softring->s_ring_lock);
 638         softring->s_ring_state |= sr_flag;
 639         cv_signal(&softring->s_ring_async);
 640         mutex_exit(&softring->s_ring_lock);
 641 }
 642 
 643 /*
 644  * mac_tx_soft_ring_drain
 645  *
 646  * The transmit side drain routine in case the soft ring was being
 647  * used to transmit packets.
 648  */
 649 static void
 650 mac_tx_soft_ring_drain(mac_soft_ring_t *ringp)
 651 {
 652         mblk_t                  *mp;
 653         void                    *arg1;
 654         void                    *arg2;
 655         mblk_t                  *tail;
 656         uint_t                  saved_pkt_count, saved_size;
 657         mac_tx_stats_t          stats;
 658         mac_soft_ring_set_t     *mac_srs = ringp->s_ring_set;
 659 
 660         saved_pkt_count = saved_size = 0;
 661         ringp->s_ring_run = curthread;
 662         ASSERT(mutex_owned(&ringp->s_ring_lock));
 663         ASSERT(!(ringp->s_ring_state & S_RING_PROC));
 664 
 665         ringp->s_ring_state |= S_RING_PROC;
 666         arg1 = ringp->s_ring_tx_arg1;
 667         arg2 = ringp->s_ring_tx_arg2;
 668 
 669         while (ringp->s_ring_first != NULL) {
 670                 mp = ringp->s_ring_first;
 671                 tail = ringp->s_ring_last;
 672                 saved_pkt_count = ringp->s_ring_count;
 673                 saved_size = ringp->s_ring_size;
 674                 ringp->s_ring_first = NULL;
 675                 ringp->s_ring_last = NULL;
 676                 ringp->s_ring_count = 0;
 677                 ringp->s_ring_size = 0;
 678                 mutex_exit(&ringp->s_ring_lock);
 679 
 680                 mp = mac_tx_send(arg1, arg2, mp, &stats);
 681 
 682                 mutex_enter(&ringp->s_ring_lock);
 683                 if (mp != NULL) {
 684                         /* Device out of tx desc, set block */
 685                         tail->b_next = ringp->s_ring_first;
 686                         ringp->s_ring_first = mp;
 687                         ringp->s_ring_count +=
 688                             (saved_pkt_count - stats.mts_opackets);
 689                         ringp->s_ring_size += (saved_size - stats.mts_obytes);
 690                         if (ringp->s_ring_last == NULL)
 691                                 ringp->s_ring_last = tail;
 692 
 693                         if (ringp->s_ring_tx_woken_up) {
 694                                 ringp->s_ring_tx_woken_up = B_FALSE;
 695                         } else {
 696                                 ringp->s_ring_state |= S_RING_BLOCK;
 697                                 ringp->s_st_stat.mts_blockcnt++;
 698                         }
 699 
 700                         ringp->s_ring_state &= ~S_RING_PROC;
 701                         ringp->s_ring_run = NULL;
 702                         return;
 703                 } else {
 704                         ringp->s_ring_tx_woken_up = B_FALSE;
 705                         SRS_TX_STATS_UPDATE(mac_srs, &stats);
 706                         SOFTRING_TX_STATS_UPDATE(ringp, &stats);
 707                 }
 708         }
 709 
 710         if (ringp->s_ring_count == 0 && ringp->s_ring_state &
 711             (S_RING_TX_HIWAT | S_RING_WAKEUP_CLIENT | S_RING_ENQUEUED)) {
 712                 mac_client_impl_t *mcip =  ringp->s_ring_mcip;
 713                 boolean_t wakeup_required = B_FALSE;
 714 
 715                 if (ringp->s_ring_state &
 716                     (S_RING_TX_HIWAT|S_RING_WAKEUP_CLIENT)) {
 717                         wakeup_required = B_TRUE;
 718                 }
 719                 ringp->s_ring_state &=
 720                     ~(S_RING_TX_HIWAT | S_RING_WAKEUP_CLIENT | S_RING_ENQUEUED);
 721                 mutex_exit(&ringp->s_ring_lock);
 722                 if (wakeup_required) {
 723                         mac_tx_invoke_callbacks(mcip, (mac_tx_cookie_t)ringp);
 724                         /*
 725                          * If the client is not the primary MAC client, then we
 726                          * need to send the notification to the clients upper
 727                          * MAC, i.e. mci_upper_mip.
 728                          */
 729                         mac_tx_notify(mcip->mci_upper_mip != NULL ?
 730                             mcip->mci_upper_mip : mcip->mci_mip);
 731                 }
 732                 mutex_enter(&ringp->s_ring_lock);
 733         }
 734         ringp->s_ring_state &= ~S_RING_PROC;
 735         ringp->s_ring_run = NULL;
 736 }