[BACK]Return to rf_engine.c CVS log [TXT][DIR] Up to [local] / sys / dev / raidframe

Annotation of sys/dev/raidframe/rf_engine.c, Revision 1.1.1.1

1.1       nbrk        1: /*     $OpenBSD: rf_engine.c,v 1.15 2003/04/27 11:22:54 ho Exp $       */
                      2: /*     $NetBSD: rf_engine.c,v 1.10 2000/08/20 16:51:03 thorpej Exp $   */
                      3:
                      4: /*
                      5:  * Copyright (c) 1995 Carnegie-Mellon University.
                      6:  * All rights reserved.
                      7:  *
                      8:  * Author: William V. Courtright II, Mark Holland, Rachad Youssef
                      9:  *
                     10:  * Permission to use, copy, modify and distribute this software and
                     11:  * its documentation is hereby granted, provided that both the copyright
                     12:  * notice and this permission notice appear in all copies of the
                     13:  * software, derivative works or modified versions, and any portions
                     14:  * thereof, and that both notices appear in supporting documentation.
                     15:  *
                     16:  * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
                     17:  * CONDITION.  CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND
                     18:  * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
                     19:  *
                     20:  * Carnegie Mellon requests users of this software to return to
                     21:  *
                     22:  *  Software Distribution Coordinator  or  Software.Distribution@CS.CMU.EDU
                     23:  *  School of Computer Science
                     24:  *  Carnegie Mellon University
                     25:  *  Pittsburgh PA 15213-3890
                     26:  *
                     27:  * any improvements or extensions that they make and grant Carnegie the
                     28:  * rights to redistribute these changes.
                     29:  */
                     30:
                     31: /****************************************************************************
                     32:  *                                                                          *
                     33:  * engine.c -- Code for DAG execution engine.                               *
                     34:  *                                                                          *
                     35:  * Modified to work as follows (holland):                                   *
                     36:  *   A user-thread calls into DispatchDAG, which fires off the nodes that   *
                     37:  *   are direct successors to the header node. DispatchDAG then returns,    *
                     38:  *   and the rest of the I/O continues asynchronously. As each node         *
                     39:  *   completes, the node execution function calls FinishNode(). FinishNode  *
                     40:  *   scans the list of successors to the node and increments the antecedent *
                     41:  *   counts. Each node that becomes enabled is placed on a central node     *
                     42:  *   queue. A dedicated dag-execution thread grabs nodes off of this        *
                     43:  *   queue and fires them.                                                  *
                     44:  *                                                                          *
                     45:  *   NULL nodes are never fired.                                            *
                     46:  *                                                                          *
                     47:  *   Terminator nodes are never fired, but rather cause the callback        *
                     48:  *   associated with the DAG to be invoked.                                 *
                     49:  *                                                                          *
                     50:  *   If a node fails, the dag either rolls forward to the completion or     *
                     51:  *   rolls back, undoing previously-completed nodes and fails atomically.   *
                     52:  *   The direction of recovery is determined by the location of the failed  *
                     53:  *   node in the graph. If the failure occurred before the commit node in   *
                     54:  *   the graph, backward recovery is used. Otherwise, forward recovery is   *
                     55:  *   used.                                                                  *
                     56:  *                                                                          *
                     57:  ****************************************************************************/
                     58:
                     59: #include "rf_threadstuff.h"
                     60:
                     61: #include <sys/errno.h>
                     62:
                     63: #include "rf_dag.h"
                     64: #include "rf_engine.h"
                     65: #include "rf_etimer.h"
                     66: #include "rf_general.h"
                     67: #include "rf_dagutils.h"
                     68: #include "rf_shutdown.h"
                     69: #include "rf_raid.h"
                     70:
                     71: int  rf_BranchDone(RF_DagNode_t *);
                     72: int  rf_NodeReady(RF_DagNode_t *);
                     73: void rf_FireNode(RF_DagNode_t *);
                     74: void rf_FireNodeArray(int, RF_DagNode_t **);
                     75: void rf_FireNodeList(RF_DagNode_t *);
                     76: void rf_PropagateResults(RF_DagNode_t *, int);
                     77: void rf_ProcessNode(RF_DagNode_t *, int);
                     78:
                     79: void rf_DAGExecutionThread(RF_ThreadArg_t);
                     80: #ifdef RAID_AUTOCONFIG
                     81: #define        RF_ENGINE_PID   10
                     82: void rf_DAGExecutionThread_pre(RF_ThreadArg_t);
                     83: extern pid_t     lastpid;
                     84: #endif /* RAID_AUTOCONFIG */
                     85: void           **rf_hook_cookies;
                     86: extern int       numraid;
                     87:
                     88: #define        DO_INIT(_l_,_r_)                                                \
                     89: do {                                                                   \
                     90:        int _rc;                                                        \
                     91:        _rc = rf_create_managed_mutex(_l_, &(_r_)->node_queue_mutex);   \
                     92:        if (_rc) {                                                      \
                     93:                return(_rc);                                            \
                     94:        }                                                               \
                     95:        _rc = rf_create_managed_cond(_l_, &(_r_)->node_queue_cond);     \
                     96:        if (_rc) {                                                      \
                     97:                return(_rc);                                            \
                     98:        }                                                               \
                     99: } while (0)
                    100:
                    101: /*
                    102:  * Synchronization primitives for this file. DO_WAIT should be enclosed
                    103:  * in a while loop.
                    104:  */
                    105:
                    106: /*
                    107:  * XXX Is this spl-ing really necessary ?
                    108:  */
                    109: #define        DO_LOCK(_r_)                                                    \
                    110: do {                                                                   \
                    111:        ks = splbio();                                                  \
                    112:        RF_LOCK_MUTEX((_r_)->node_queue_mutex);                         \
                    113: } while (0)
                    114:
                    115: #define        DO_UNLOCK(_r_)                                                  \
                    116: do {                                                                   \
                    117:        RF_UNLOCK_MUTEX((_r_)->node_queue_mutex);                       \
                    118:        splx(ks);                                                       \
                    119: } while (0)
                    120:
                    121: #define        DO_WAIT(_r_)                                                    \
                    122:        RF_WAIT_COND((_r_)->node_queue, (_r_)->node_queue_mutex)
                    123:
                    124: /* XXX RF_SIGNAL_COND? */
                    125: #define        DO_SIGNAL(_r_)                                                  \
                    126:        RF_BROADCAST_COND((_r_)->node_queue)
                    127:
                    128: void rf_ShutdownEngine(void *);
                    129:
                    130: void
                    131: rf_ShutdownEngine(void *arg)
                    132: {
                    133:        RF_Raid_t *raidPtr;
                    134:
                    135:        raidPtr = (RF_Raid_t *) arg;
                    136:        raidPtr->shutdown_engine = 1;
                    137:        DO_SIGNAL(raidPtr);
                    138: }
                    139:
                    140: int
                    141: rf_ConfigureEngine(RF_ShutdownList_t **listp, RF_Raid_t *raidPtr,
                    142:     RF_Config_t *cfgPtr)
                    143: {
                    144:        int rc;
                    145:        char raidname[16];
                    146:
                    147:        DO_INIT(listp, raidPtr);
                    148:
                    149:        raidPtr->node_queue = NULL;
                    150:        raidPtr->dags_in_flight = 0;
                    151:
                    152:        rc = rf_init_managed_threadgroup(listp, &raidPtr->engine_tg);
                    153:        if (rc)
                    154:                return (rc);
                    155:
                    156:        /*
                    157:         * We create the execution thread only once per system boot. No need
                    158:         * to check return code b/c the kernel panics if it can't create the
                    159:         * thread.
                    160:         */
                    161:        if (rf_engineDebug) {
                    162:                printf("raid%d: %s engine thread\n", raidPtr->raidid,
                    163:                    (initproc)?"Starting":"Creating");
                    164:        }
                    165:        if (rf_hook_cookies == NULL) {
                    166:                rf_hook_cookies =
                    167:                    malloc(numraid * sizeof(void *),
                    168:                           M_RAIDFRAME, M_NOWAIT);
                    169:                if (rf_hook_cookies == NULL)
                    170:                        return (ENOMEM);
                    171:                bzero(rf_hook_cookies, numraid * sizeof(void *));
                    172:        }
                    173: #ifdef RAID_AUTOCONFIG
                    174:        if (initproc == NULL) {
                    175:                rf_hook_cookies[raidPtr->raidid] =
                    176:                        startuphook_establish(rf_DAGExecutionThread_pre,
                    177:                            raidPtr);
                    178:        } else {
                    179: #endif /* RAID_AUTOCONFIG */
                    180:                snprintf(&raidname[0], 16, "raid%d", raidPtr->raidid);
                    181:                if (RF_CREATE_THREAD(raidPtr->engine_thread,
                    182:                    rf_DAGExecutionThread, raidPtr, &raidname[0])) {
                    183:                        RF_ERRORMSG("RAIDFRAME: Unable to start engine"
                    184:                            " thread\n");
                    185:                        return (ENOMEM);
                    186:                }
                    187:                if (rf_engineDebug) {
                    188:                        printf("raid%d: Engine thread started\n",
                    189:                            raidPtr->raidid);
                    190:                }
                    191:                RF_THREADGROUP_STARTED(&raidPtr->engine_tg);
                    192: #ifdef RAID_AUTOCONFIG
                    193:        }
                    194: #endif
                    195:        /* XXX Something is missing here... */
                    196: #ifdef debug
                    197:        printf("Skipping the WAIT_START !!!\n");
                    198: #endif
                    199:        /* Engine thread is now running and waiting for work. */
                    200:        if (rf_engineDebug) {
                    201:                printf("raid%d: Engine thread running and waiting for events\n",
                    202:                    raidPtr->raidid);
                    203:        }
                    204:        rc = rf_ShutdownCreate(listp, rf_ShutdownEngine, raidPtr);
                    205:        if (rc) {
                    206:                RF_ERRORMSG3("Unable to add to shutdown list file %s line %d"
                    207:                    " rc=%d\n", __FILE__, __LINE__, rc);
                    208:                rf_ShutdownEngine(NULL);
                    209:        }
                    210:        return (rc);
                    211: }
                    212:
                    213: int
                    214: rf_BranchDone(RF_DagNode_t *node)
                    215: {
                    216:        int i;
                    217:
                    218:        /*
                    219:         * Return true if forward execution is completed for a node and it's
                    220:         * succedents.
                    221:         */
                    222:        switch (node->status) {
                    223:        case rf_wait:
                    224:                /* Should never be called in this state. */
                    225:                RF_PANIC();
                    226:                break;
                    227:        case rf_fired:
                    228:                /* Node is currently executing, so we're not done. */
                    229:                return (RF_FALSE);
                    230:        case rf_good:
                    231:                /* For each succedent. */
                    232:                for (i = 0; i < node->numSuccedents; i++)
                    233:                        /* Recursively check branch. */
                    234:                        if (!rf_BranchDone(node->succedents[i]))
                    235:                                return RF_FALSE;
                    236:
                    237:                return RF_TRUE; /*
                    238:                                 * Node and all succedent branches aren't in
                    239:                                 * fired state.
                    240:                                 */
                    241:                break;
                    242:        case rf_bad:
                    243:                /* Succedents can't fire. */
                    244:                return (RF_TRUE);
                    245:        case rf_recover:
                    246:                /* Should never be called in this state. */
                    247:                RF_PANIC();
                    248:                break;
                    249:        case rf_undone:
                    250:        case rf_panic:
                    251:                /* XXX Need to fix this case. */
                    252:                /* For now, assume that we're done. */
                    253:                return (RF_TRUE);
                    254:                break;
                    255:        default:
                    256:                /* Illegal node status. */
                    257:                RF_PANIC();
                    258:                break;
                    259:        }
                    260: }
                    261:
                    262: int
                    263: rf_NodeReady(RF_DagNode_t *node)
                    264: {
                    265:        int ready;
                    266:
                    267:        switch (node->dagHdr->status) {
                    268:        case rf_enable:
                    269:        case rf_rollForward:
                    270:                if ((node->status == rf_wait) &&
                    271:                    (node->numAntecedents == node->numAntDone))
                    272:                        ready = RF_TRUE;
                    273:                else
                    274:                        ready = RF_FALSE;
                    275:                break;
                    276:        case rf_rollBackward:
                    277:                RF_ASSERT(node->numSuccDone <= node->numSuccedents);
                    278:                RF_ASSERT(node->numSuccFired <= node->numSuccedents);
                    279:                RF_ASSERT(node->numSuccFired <= node->numSuccDone);
                    280:                if ((node->status == rf_good) &&
                    281:                    (node->numSuccDone == node->numSuccedents))
                    282:                        ready = RF_TRUE;
                    283:                else
                    284:                        ready = RF_FALSE;
                    285:                break;
                    286:        default:
                    287:                printf("Execution engine found illegal DAG status"
                    288:                    " in rf_NodeReady\n");
                    289:                RF_PANIC();
                    290:                break;
                    291:        }
                    292:
                    293:        return (ready);
                    294: }
                    295:
                    296:
                    297: /*
                    298:  * User context and dag-exec-thread context:
                    299:  * Fire a node. The node's status field determines which function, do or undo,
                    300:  * to be fired.
                    301:  * This routine assumes that the node's status field has alread been set to
                    302:  * "fired" or "recover" to indicate the direction of execution.
                    303:  */
                    304: void
                    305: rf_FireNode(RF_DagNode_t *node)
                    306: {
                    307:        switch (node->status) {
                    308:        case rf_fired:
                    309:                /* Fire the do function of a node. */
                    310:                if (rf_engineDebug>1) {
                    311:                        printf("raid%d: Firing node 0x%lx (%s)\n",
                    312:                            node->dagHdr->raidPtr->raidid,
                    313:                            (unsigned long) node, node->name);
                    314:                }
                    315:                if (node->flags & RF_DAGNODE_FLAG_YIELD) {
                    316: #if (defined(__NetBSD__) || defined(__OpenBSD__)) && defined(_KERNEL)
                    317:                        /* thread_block(); */
                    318:                        /* printf("Need to block the thread here...\n");  */
                    319:                        /*
                    320:                         * XXX thread_block is actually mentioned in
                    321:                         * /usr/include/vm/vm_extern.h
                    322:                         */
                    323: #else
                    324:                        thread_block();
                    325: #endif
                    326:                }
                    327:                (*(node->doFunc)) (node);
                    328:                break;
                    329:        case rf_recover:
                    330:                /* Fire the undo function of a node. */
                    331:                if (rf_engineDebug>1) {
                    332:                        printf("raid%d: Firing (undo) node 0x%lx (%s)\n",
                    333:                            node->dagHdr->raidPtr->raidid,
                    334:                            (unsigned long) node, node->name);
                    335:                }
                    336:                if (node->flags & RF_DAGNODE_FLAG_YIELD) {
                    337: #if (defined(__NetBSD__) || defined(__OpenBSD__)) && defined(_KERNEL)
                    338:                        /* thread_block(); */
                    339:                        /* printf("Need to block the thread here...\n"); */
                    340:                        /*
                    341:                         * XXX thread_block is actually mentioned in
                    342:                         * /usr/include/vm/vm_extern.h
                    343:                         */
                    344: #else
                    345:                        thread_block();
                    346: #endif
                    347:                }
                    348:                (*(node->undoFunc)) (node);
                    349:                break;
                    350:        default:
                    351:                RF_PANIC();
                    352:                break;
                    353:        }
                    354: }
                    355:
                    356:
                    357: /*
                    358:  * User context:
                    359:  * Attempt to fire each node in a linear array.
                    360:  * The entire list is fired atomically.
                    361:  */
                    362: void
                    363: rf_FireNodeArray(int numNodes, RF_DagNode_t **nodeList)
                    364: {
                    365:        RF_DagStatus_t dstat;
                    366:        RF_DagNode_t *node;
                    367:        int i, j;
                    368:
                    369:        /* First, mark all nodes which are ready to be fired. */
                    370:        for (i = 0; i < numNodes; i++) {
                    371:                node = nodeList[i];
                    372:                dstat = node->dagHdr->status;
                    373:                RF_ASSERT((node->status == rf_wait) ||
                    374:                    (node->status == rf_good));
                    375:                if (rf_NodeReady(node)) {
                    376:                        if ((dstat == rf_enable) || (dstat == rf_rollForward)) {
                    377:                                RF_ASSERT(node->status == rf_wait);
                    378:                                if (node->commitNode)
                    379:                                        node->dagHdr->numCommits++;
                    380:                                node->status = rf_fired;
                    381:                                for (j = 0; j < node->numAntecedents; j++)
                    382:                                        node->antecedents[j]->numSuccFired++;
                    383:                        } else {
                    384:                                RF_ASSERT(dstat == rf_rollBackward);
                    385:                                RF_ASSERT(node->status == rf_good);
                    386:                                /* Only one commit node per graph. */
                    387:                                RF_ASSERT(node->commitNode == RF_FALSE);
                    388:                                node->status = rf_recover;
                    389:                        }
                    390:                }
                    391:        }
                    392:        /* Now, fire the nodes. */
                    393:        for (i = 0; i < numNodes; i++) {
                    394:                if ((nodeList[i]->status == rf_fired) ||
                    395:                    (nodeList[i]->status == rf_recover))
                    396:                        rf_FireNode(nodeList[i]);
                    397:        }
                    398: }
                    399:
                    400:
                    401: /*
                    402:  * User context:
                    403:  * Attempt to fire each node in a linked list.
                    404:  * The entire list is fired atomically.
                    405:  */
                    406: void
                    407: rf_FireNodeList(RF_DagNode_t *nodeList)
                    408: {
                    409:        RF_DagNode_t *node, *next;
                    410:        RF_DagStatus_t dstat;
                    411:        int j;
                    412:
                    413:        if (nodeList) {
                    414:                /* First, mark all nodes which are ready to be fired. */
                    415:                for (node = nodeList; node; node = next) {
                    416:                        next = node->next;
                    417:                        dstat = node->dagHdr->status;
                    418:                        RF_ASSERT((node->status == rf_wait) ||
                    419:                            (node->status == rf_good));
                    420:                        if (rf_NodeReady(node)) {
                    421:                                if ((dstat == rf_enable) ||
                    422:                                    (dstat == rf_rollForward)) {
                    423:                                        RF_ASSERT(node->status == rf_wait);
                    424:                                        if (node->commitNode)
                    425:                                                node->dagHdr->numCommits++;
                    426:                                        node->status = rf_fired;
                    427:                                        for (j = 0; j < node->numAntecedents;
                    428:                                             j++)
                    429:                                                node->antecedents[j]
                    430:                                                    ->numSuccFired++;
                    431:                                } else {
                    432:                                        RF_ASSERT(dstat == rf_rollBackward);
                    433:                                        RF_ASSERT(node->status == rf_good);
                    434:                                        /* Only one commit node per graph. */
                    435:                                        RF_ASSERT(node->commitNode == RF_FALSE);
                    436:                                        node->status = rf_recover;
                    437:                                }
                    438:                        }
                    439:                }
                    440:                /* Now, fire the nodes. */
                    441:                for (node = nodeList; node; node = next) {
                    442:                        next = node->next;
                    443:                        if ((node->status == rf_fired) ||
                    444:                            (node->status == rf_recover))
                    445:                                rf_FireNode(node);
                    446:                }
                    447:        }
                    448: }
                    449:
                    450:
                    451: /*
                    452:  * Interrupt context:
                    453:  * For each succedent,
                    454:  *    propagate required results from node to succedent.
                    455:  *    increment succedent's numAntDone.
                    456:  *    place newly-enable nodes on node queue for firing.
                    457:  *
                    458:  * To save context switches, we don't place NIL nodes on the node queue,
                    459:  * but rather just process them as if they had fired. Note that NIL nodes
                    460:  * that are the direct successors of the header will actually get fired by
                    461:  * DispatchDAG, which is fine because no context switches are involved.
                    462:  *
                    463:  * Important:  when running at user level, this can be called by any
                    464:  * disk thread, and so the increment and check of the antecedent count
                    465:  * must be locked. I used the node queue mutex and locked down the
                    466:  * entire function, but this is certainly overkill.
                    467:  */
                    468: void
                    469: rf_PropagateResults(RF_DagNode_t *node, int context)
                    470: {
                    471:        RF_DagNode_t *s, *a;
                    472:        RF_Raid_t *raidPtr;
                    473:        int i, ks;
                    474:        /* A list of NIL nodes to be finished. */
                    475:        RF_DagNode_t *finishlist = NULL;
                    476:        /* List of nodes with failed truedata antecedents. */
                    477:        RF_DagNode_t *skiplist = NULL;
                    478:        RF_DagNode_t *firelist = NULL;  /* A list of nodes to be fired. */
                    479:        RF_DagNode_t *q = NULL, *qh = NULL, *next;
                    480:        int j, skipNode;
                    481:
                    482:        raidPtr = node->dagHdr->raidPtr;
                    483:
                    484:        DO_LOCK(raidPtr);
                    485:
                    486:        /* Debug - validate fire counts. */
                    487:        for (i = 0; i < node->numAntecedents; i++) {
                    488:                a = *(node->antecedents + i);
                    489:                RF_ASSERT(a->numSuccFired >= a->numSuccDone);
                    490:                RF_ASSERT(a->numSuccFired <= a->numSuccedents);
                    491:                a->numSuccDone++;
                    492:        }
                    493:
                    494:        switch (node->dagHdr->status) {
                    495:        case rf_enable:
                    496:        case rf_rollForward:
                    497:                for (i = 0; i < node->numSuccedents; i++) {
                    498:                        s = *(node->succedents + i);
                    499:                        RF_ASSERT(s->status == rf_wait);
                    500:                        (s->numAntDone)++;
                    501:                        if (s->numAntDone == s->numAntecedents) {
                    502:                                /* Look for NIL nodes. */
                    503:                                if (s->doFunc == rf_NullNodeFunc) {
                    504:                                        /*
                    505:                                         * Don't fire NIL nodes, just process
                    506:                                         * them.
                    507:                                         */
                    508:                                        s->next = finishlist;
                    509:                                        finishlist = s;
                    510:                                } else {
                    511:                                        /*
                    512:                                         * Look to see if the node is to be
                    513:                                         * skipped.
                    514:                                         */
                    515:                                        skipNode = RF_FALSE;
                    516:                                        for (j = 0; j < s->numAntecedents; j++)
                    517:                                                if ((s->antType[j] ==
                    518:                                                     rf_trueData) &&
                    519:                                                    (s->antecedents[j]->status
                    520:                                                     == rf_bad))
                    521:                                                        skipNode = RF_TRUE;
                    522:                                        if (skipNode) {
                    523:                                                /*
                    524:                                                 * This node has one or more
                    525:                                                 * failed true data
                    526:                                                 * dependencies, so skip it.
                    527:                                                 */
                    528:                                                s->next = skiplist;
                    529:                                                skiplist = s;
                    530:                                        } else {
                    531:                                                /*
                    532:                                                 * Add s to list of nodes (q)
                    533:                                                 * to execute.
                    534:                                                 */
                    535:                                                if (context != RF_INTR_CONTEXT)
                    536:                                                {
                    537:                                                        /*
                    538:                                                         * We only have to
                    539:                                                         * enqueue if we're at
                    540:                                                         * intr context.
                    541:                                                         */
                    542:                                                        /*
                    543:                                                         * Put node on a list to
                    544:                                                         * be fired after we
                    545:                                                         * unlock.
                    546:                                                         */
                    547:                                                        s->next = firelist;
                    548:                                                        firelist = s;
                    549:                                                } else {
                    550:                                                        /*
                    551:                                                         * Enqueue the node for
                    552:                                                         * the dag exec thread
                    553:                                                         * to fire.
                    554:                                                         */
                    555:                                                     RF_ASSERT(rf_NodeReady(s));
                    556:                                                        if (q) {
                    557:                                                                q->next = s;
                    558:                                                                q = s;
                    559:                                                        } else {
                    560:                                                                qh = q = s;
                    561:                                                                qh->next = NULL;
                    562:                                                        }
                    563:                                                }
                    564:                                        }
                    565:                                }
                    566:                        }
                    567:                }
                    568:
                    569:                if (q) {
                    570:                        /*
                    571:                         * Transfer our local list of nodes to the node
                    572:                         * queue.
                    573:                         */
                    574:                        q->next = raidPtr->node_queue;
                    575:                        raidPtr->node_queue = qh;
                    576:                        DO_SIGNAL(raidPtr);
                    577:                }
                    578:                DO_UNLOCK(raidPtr);
                    579:
                    580:                for (; skiplist; skiplist = next) {
                    581:                        next = skiplist->next;
                    582:                        skiplist->status = rf_skipped;
                    583:                        for (i = 0; i < skiplist->numAntecedents; i++) {
                    584:                                skiplist->antecedents[i]->numSuccFired++;
                    585:                        }
                    586:                        if (skiplist->commitNode) {
                    587:                                skiplist->dagHdr->numCommits++;
                    588:                        }
                    589:                        rf_FinishNode(skiplist, context);
                    590:                }
                    591:                for (; finishlist; finishlist = next) {
                    592:                        /* NIL nodes: no need to fire them. */
                    593:                        next = finishlist->next;
                    594:                        finishlist->status = rf_good;
                    595:                        for (i = 0; i < finishlist->numAntecedents; i++) {
                    596:                                finishlist->antecedents[i]->numSuccFired++;
                    597:                        }
                    598:                        if (finishlist->commitNode)
                    599:                                finishlist->dagHdr->numCommits++;
                    600:                        /*
                    601:                         * Okay, here we're calling rf_FinishNode() on nodes
                    602:                         * that have the null function as their work proc.
                    603:                         * Such a node could be the terminal node in a DAG.
                    604:                         * If so, it will cause the DAG to complete, which will
                    605:                         * in turn free memory used by the DAG, which includes
                    606:                         * the node in question.
                    607:                         * Thus, we must avoid referencing the node at all
                    608:                         * after calling rf_FinishNode() on it.
                    609:                         */
                    610:                        /* Recursive call. */
                    611:                        rf_FinishNode(finishlist, context);
                    612:                }
                    613:                /* Fire all nodes in firelist. */
                    614:                rf_FireNodeList(firelist);
                    615:                break;
                    616:
                    617:        case rf_rollBackward:
                    618:                for (i = 0; i < node->numAntecedents; i++) {
                    619:                        a = *(node->antecedents + i);
                    620:                        RF_ASSERT(a->status == rf_good);
                    621:                        RF_ASSERT(a->numSuccDone <= a->numSuccedents);
                    622:                        RF_ASSERT(a->numSuccDone <= a->numSuccFired);
                    623:
                    624:                        if (a->numSuccDone == a->numSuccFired) {
                    625:                                if (a->undoFunc == rf_NullNodeFunc) {
                    626:                                        /*
                    627:                                         * Don't fire NIL nodes, just process
                    628:                                         * them.
                    629:                                         */
                    630:                                        a->next = finishlist;
                    631:                                        finishlist = a;
                    632:                                } else {
                    633:                                        if (context != RF_INTR_CONTEXT) {
                    634:                                                /*
                    635:                                                 * We only have to enqueue if
                    636:                                                 * we're at intr context.
                    637:                                                 */
                    638:                                                /*
                    639:                                                 * Put node on a list to
                    640:                                                 * be fired after we
                    641:                                                 * unlock.
                    642:                                                 */
                    643:                                                a->next = firelist;
                    644:                                                firelist = a;
                    645:                                        } else {
                    646:                                                /*
                    647:                                                 * Enqueue the node for
                    648:                                                 * the dag exec thread
                    649:                                                 * to fire.
                    650:                                                 */
                    651:                                                RF_ASSERT(rf_NodeReady(a));
                    652:                                                if (q) {
                    653:                                                        q->next = a;
                    654:                                                        q = a;
                    655:                                                } else {
                    656:                                                        qh = q = a;
                    657:                                                        qh->next = NULL;
                    658:                                                }
                    659:                                        }
                    660:                                }
                    661:                        }
                    662:                }
                    663:                if (q) {
                    664:                        /*
                    665:                         * Transfer our local list of nodes to the node
                    666:                         * queue.
                    667:                         */
                    668:                        q->next = raidPtr->node_queue;
                    669:                        raidPtr->node_queue = qh;
                    670:                        DO_SIGNAL(raidPtr);
                    671:                }
                    672:                DO_UNLOCK(raidPtr);
                    673:                for (; finishlist; finishlist = next) {
                    674:                        /* NIL nodes: no need to fire them. */
                    675:                        next = finishlist->next;
                    676:                        finishlist->status = rf_good;
                    677:                        /*
                    678:                         * Okay, here we're calling rf_FinishNode() on nodes
                    679:                         * that have the null function as their work proc.
                    680:                         * Such a node could be the first node in a DAG.
                    681:                         * If so, it will cause the DAG to complete, which will
                    682:                         * in turn free memory used by the DAG, which includes
                    683:                         * the node in question.
                    684:                         * Thus, we must avoid referencing the node at all
                    685:                         * after calling rf_FinishNode() on it.
                    686:                         */
                    687:                        rf_FinishNode(finishlist, context);
                    688:                        /* Recursive call. */
                    689:                }
                    690:                /* Fire all nodes in firelist. */
                    691:                rf_FireNodeList(firelist);
                    692:
                    693:                break;
                    694:        default:
                    695:                printf("Engine found illegal DAG status in"
                    696:                    " rf_PropagateResults()\n");
                    697:                RF_PANIC();
                    698:                break;
                    699:        }
                    700: }
                    701:
                    702:
                    703: /*
                    704:  * Process a fired node which has completed.
                    705:  */
                    706: void
                    707: rf_ProcessNode(RF_DagNode_t *node, int context)
                    708: {
                    709:        RF_Raid_t *raidPtr;
                    710:
                    711:        raidPtr = node->dagHdr->raidPtr;
                    712:
                    713:        switch (node->status) {
                    714:        case rf_good:
                    715:                /* Normal case, don't need to do anything. */
                    716:                break;
                    717:        case rf_bad:
                    718:                if ((node->dagHdr->numCommits > 0) ||
                    719:                    (node->dagHdr->numCommitNodes == 0)) {
                    720:                        /* Crossed commit barrier. */
                    721:                        node->dagHdr->status = rf_rollForward;
                    722:                        if (rf_engineDebug || 1) {
                    723:                                printf("raid%d: node (%s) returned fail,"
                    724:                                    " rolling forward\n", raidPtr->raidid,
                    725:                                    node->name);
                    726:                        }
                    727:                } else {
                    728:                        /* Never reached commit barrier. */
                    729:                        node->dagHdr->status = rf_rollBackward;
                    730:                        if (rf_engineDebug || 1) {
                    731:                                printf("raid%d: node (%s) returned fail,"
                    732:                                    " rolling backward\n", raidPtr->raidid,
                    733:                                    node->name);
                    734:                        }
                    735:                }
                    736:                break;
                    737:        case rf_undone:
                    738:                /* Normal rollBackward case, don't need to do anything. */
                    739:                break;
                    740:        case rf_panic:
                    741:                /* An undo node failed !!! */
                    742:                printf("UNDO of a node failed !!!/n");
                    743:                break;
                    744:        default:
                    745:                printf("node finished execution with an illegal status !!!\n");
                    746:                RF_PANIC();
                    747:                break;
                    748:        }
                    749:
                    750:        /*
                    751:         * Enqueue node's succedents (antecedents if rollBackward) for
                    752:         * execution.
                    753:         */
                    754:        rf_PropagateResults(node, context);
                    755: }
                    756:
                    757:
                    758: /*
                    759:  * User context or dag-exec-thread context:
                    760:  * This is the first step in post-processing a newly-completed node.
                    761:  * This routine is called by each node execution function to mark the node
                    762:  * as complete and fire off any successors that have been enabled.
                    763:  */
                    764: int
                    765: rf_FinishNode(RF_DagNode_t *node, int context)
                    766: {
                    767:        /* As far as I can tell, retcode is not used -wvcii. */
                    768:        int retcode = RF_FALSE;
                    769:        node->dagHdr->numNodesCompleted++;
                    770:        rf_ProcessNode(node, context);
                    771:
                    772:        return (retcode);
                    773: }
                    774:
                    775:
                    776: /*
                    777:  * User context:
                    778:  * Submit dag for execution, return non-zero if we have to wait for completion.
                    779:  * If and only if we return non-zero, we'll cause cbFunc to get invoked with
                    780:  * cbArg when the DAG has completed.
                    781:  *
                    782:  * For now we always return 1. If the DAG does not cause any I/O, then the
                    783:  * callback may get invoked before DispatchDAG returns. There's code in state
                    784:  * 5 of ContinueRaidAccess to handle this.
                    785:  *
                    786:  * All we do here is fire the direct successors of the header node. The DAG
                    787:  * execution thread does the rest of the dag processing.
                    788:  */
                    789: int
                    790: rf_DispatchDAG(RF_DagHeader_t *dag, void (*cbFunc) (void *), void *cbArg)
                    791: {
                    792:        RF_Raid_t *raidPtr;
                    793:
                    794:        raidPtr = dag->raidPtr;
                    795:        if (dag->tracerec) {
                    796:                RF_ETIMER_START(dag->tracerec->timer);
                    797:        }
                    798:        if (rf_engineDebug || rf_validateDAGDebug) {
                    799:                if (rf_ValidateDAG(dag))
                    800:                        RF_PANIC();
                    801:        }
                    802:        if (rf_engineDebug>1) {
                    803:                printf("raid%d: Entering DispatchDAG\n", raidPtr->raidid);
                    804:        }
                    805:        raidPtr->dags_in_flight++;      /*
                    806:                                         * Debug only:  blow off proper
                    807:                                         * locking.
                    808:                                         */
                    809:        dag->cbFunc = cbFunc;
                    810:        dag->cbArg = cbArg;
                    811:        dag->numNodesCompleted = 0;
                    812:        dag->status = rf_enable;
                    813:        rf_FireNodeArray(dag->numSuccedents, dag->succedents);
                    814:        return (1);
                    815: }
                    816:
                    817:
                    818: /*
                    819:  * Dedicated kernel thread:
                    820:  * The thread that handles all DAG node firing.
                    821:  * To minimize locking and unlocking, we grab a copy of the entire node queue
                    822:  * and then set the node queue to NULL before doing any firing of nodes.
                    823:  * This way we only have to release the lock once. Of course, it's probably
                    824:  * rare that there's more than one node in the queue at any one time, but it
                    825:  * sometimes happens.
                    826:  *
                    827:  * In the kernel, this thread runs at spl0 and is not swappable. I copied these
                    828:  * characteristics from the aio_completion_thread.
                    829:  */
                    830:
                    831: #ifdef RAID_AUTOCONFIG
                    832: void
                    833: rf_DAGExecutionThread_pre(RF_ThreadArg_t arg)
                    834: {
                    835:        RF_Raid_t *raidPtr;
                    836:        char raidname[16];
                    837:        pid_t oldpid = lastpid;
                    838:
                    839:        raidPtr = (RF_Raid_t *) arg;
                    840:
                    841:        if (rf_engineDebug) {
                    842:                printf("raid%d: Starting engine thread\n", raidPtr->raidid);
                    843:        }
                    844:
                    845:        lastpid = RF_ENGINE_PID + raidPtr->raidid - 1;
                    846:        snprintf(raidname, sizeof raidname, "raid%d", raidPtr->raidid);
                    847:
                    848:        if (RF_CREATE_THREAD(raidPtr->engine_thread, rf_DAGExecutionThread,
                    849:            raidPtr, &raidname[0])) {
                    850:                RF_ERRORMSG("RAIDFRAME: Unable to start engine thread\n");
                    851:                return;
                    852:        }
                    853:
                    854:        lastpid = oldpid;
                    855:        if (rf_engineDebug) {
                    856:                printf("raid%d: Engine thread started\n", raidPtr->raidid);
                    857:        }
                    858:        RF_THREADGROUP_STARTED(&raidPtr->engine_tg);
                    859: }
                    860: #endif /* RAID_AUTOCONFIG */
                    861:
                    862: void
                    863: rf_DAGExecutionThread(RF_ThreadArg_t arg)
                    864: {
                    865:        RF_DagNode_t *nd, *local_nq, *term_nq, *fire_nq;
                    866:        RF_Raid_t *raidPtr;
                    867:        int ks;
                    868:        int s;
                    869:
                    870:        raidPtr = (RF_Raid_t *) arg;
                    871:
                    872:        while (!(&raidPtr->engine_tg)->created)
                    873:                (void) tsleep((void *)&(&raidPtr->engine_tg)->created, PWAIT,
                    874:                                "raidinit", 0);
                    875:
                    876:        if (rf_engineDebug) {
                    877:                printf("raid%d: Engine thread is running\n", raidPtr->raidid);
                    878:        }
                    879:        /* XXX What to put here ? XXX */
                    880:
                    881:        s = splbio();
                    882:
                    883:        RF_THREADGROUP_RUNNING(&raidPtr->engine_tg);
                    884:
                    885:        rf_hook_cookies[raidPtr->raidid] =
                    886:                shutdownhook_establish(rf_shutdown_hook, (void *)raidPtr);
                    887:
                    888:        DO_LOCK(raidPtr);
                    889:        while (!raidPtr->shutdown_engine) {
                    890:
                    891:                while (raidPtr->node_queue != NULL) {
                    892:                        local_nq = raidPtr->node_queue;
                    893:                        fire_nq = NULL;
                    894:                        term_nq = NULL;
                    895:                        raidPtr->node_queue = NULL;
                    896:                        DO_UNLOCK(raidPtr);
                    897:
                    898:                        /* First, strip out the terminal nodes. */
                    899:                        while (local_nq) {
                    900:                                nd = local_nq;
                    901:                                local_nq = local_nq->next;
                    902:                                switch (nd->dagHdr->status) {
                    903:                                case rf_enable:
                    904:                                case rf_rollForward:
                    905:                                        if (nd->numSuccedents == 0) {
                    906:                                                /*
                    907:                                                 * End of the dag, add to
                    908:                                                 * callback list.
                    909:                                                 */
                    910:                                                nd->next = term_nq;
                    911:                                                term_nq = nd;
                    912:                                        } else {
                    913:                                                /*
                    914:                                                 * Not the end, add to the
                    915:                                                 * fire queue.
                    916:                                                 */
                    917:                                                nd->next = fire_nq;
                    918:                                                fire_nq = nd;
                    919:                                        }
                    920:                                        break;
                    921:                                case rf_rollBackward:
                    922:                                        if (nd->numAntecedents == 0) {
                    923:                                                /*
                    924:                                                 * End of the dag, add to the
                    925:                                                 * callback list.
                    926:                                                 */
                    927:                                                nd->next = term_nq;
                    928:                                                term_nq = nd;
                    929:                                        } else {
                    930:                                                /*
                    931:                                                 * Not the end, add to the
                    932:                                                 * fire queue.
                    933:                                                 */
                    934:                                                nd->next = fire_nq;
                    935:                                                fire_nq = nd;
                    936:                                        }
                    937:                                        break;
                    938:                                default:
                    939:                                        RF_PANIC();
                    940:                                        break;
                    941:                                }
                    942:                        }
                    943:
                    944:                        /*
                    945:                         * Execute callback of dags which have reached the
                    946:                         * terminal node.
                    947:                         */
                    948:                        while (term_nq) {
                    949:                                nd = term_nq;
                    950:                                term_nq = term_nq->next;
                    951:                                nd->next = NULL;
                    952:                                (nd->dagHdr->cbFunc) (nd->dagHdr->cbArg);
                    953:                                raidPtr->dags_in_flight--; /* Debug only. */
                    954:                        }
                    955:
                    956:                        /* Fire remaining nodes. */
                    957:                        rf_FireNodeList(fire_nq);
                    958:
                    959:                        DO_LOCK(raidPtr);
                    960:                }
                    961:                while (!raidPtr->shutdown_engine && raidPtr->node_queue == NULL)
                    962:                        DO_WAIT(raidPtr);
                    963:        }
                    964:        DO_UNLOCK(raidPtr);
                    965:
                    966:        if (rf_hook_cookies && rf_hook_cookies[raidPtr->raidid] != NULL) {
                    967:                shutdownhook_disestablish(rf_hook_cookies[raidPtr->raidid]);
                    968:                rf_hook_cookies[raidPtr->raidid] = NULL;
                    969:        }
                    970:
                    971:        RF_THREADGROUP_DONE(&raidPtr->engine_tg);
                    972:
                    973:        splx(s);
                    974:        kthread_exit(0);
                    975: }

CVSweb