Annotation of sys/dev/raidframe/rf_engine.c, Revision 1.1
1.1 ! nbrk 1: /* $OpenBSD: rf_engine.c,v 1.15 2003/04/27 11:22:54 ho Exp $ */
! 2: /* $NetBSD: rf_engine.c,v 1.10 2000/08/20 16:51:03 thorpej Exp $ */
! 3:
! 4: /*
! 5: * Copyright (c) 1995 Carnegie-Mellon University.
! 6: * All rights reserved.
! 7: *
! 8: * Author: William V. Courtright II, Mark Holland, Rachad Youssef
! 9: *
! 10: * Permission to use, copy, modify and distribute this software and
! 11: * its documentation is hereby granted, provided that both the copyright
! 12: * notice and this permission notice appear in all copies of the
! 13: * software, derivative works or modified versions, and any portions
! 14: * thereof, and that both notices appear in supporting documentation.
! 15: *
! 16: * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
! 17: * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND
! 18: * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
! 19: *
! 20: * Carnegie Mellon requests users of this software to return to
! 21: *
! 22: * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU
! 23: * School of Computer Science
! 24: * Carnegie Mellon University
! 25: * Pittsburgh PA 15213-3890
! 26: *
! 27: * any improvements or extensions that they make and grant Carnegie the
! 28: * rights to redistribute these changes.
! 29: */
! 30:
! 31: /****************************************************************************
! 32: * *
! 33: * engine.c -- Code for DAG execution engine. *
! 34: * *
! 35: * Modified to work as follows (holland): *
! 36: * A user-thread calls into DispatchDAG, which fires off the nodes that *
! 37: * are direct successors to the header node. DispatchDAG then returns, *
! 38: * and the rest of the I/O continues asynchronously. As each node *
! 39: * completes, the node execution function calls FinishNode(). FinishNode *
! 40: * scans the list of successors to the node and increments the antecedent *
! 41: * counts. Each node that becomes enabled is placed on a central node *
! 42: * queue. A dedicated dag-execution thread grabs nodes off of this *
! 43: * queue and fires them. *
! 44: * *
! 45: * NULL nodes are never fired. *
! 46: * *
! 47: * Terminator nodes are never fired, but rather cause the callback *
! 48: * associated with the DAG to be invoked. *
! 49: * *
! 50: * If a node fails, the dag either rolls forward to the completion or *
! 51: * rolls back, undoing previously-completed nodes and fails atomically. *
! 52: * The direction of recovery is determined by the location of the failed *
! 53: * node in the graph. If the failure occurred before the commit node in *
! 54: * the graph, backward recovery is used. Otherwise, forward recovery is *
! 55: * used. *
! 56: * *
! 57: ****************************************************************************/
! 58:
! 59: #include "rf_threadstuff.h"
! 60:
! 61: #include <sys/errno.h>
! 62:
! 63: #include "rf_dag.h"
! 64: #include "rf_engine.h"
! 65: #include "rf_etimer.h"
! 66: #include "rf_general.h"
! 67: #include "rf_dagutils.h"
! 68: #include "rf_shutdown.h"
! 69: #include "rf_raid.h"
! 70:
! 71: int rf_BranchDone(RF_DagNode_t *);
! 72: int rf_NodeReady(RF_DagNode_t *);
! 73: void rf_FireNode(RF_DagNode_t *);
! 74: void rf_FireNodeArray(int, RF_DagNode_t **);
! 75: void rf_FireNodeList(RF_DagNode_t *);
! 76: void rf_PropagateResults(RF_DagNode_t *, int);
! 77: void rf_ProcessNode(RF_DagNode_t *, int);
! 78:
! 79: void rf_DAGExecutionThread(RF_ThreadArg_t);
! 80: #ifdef RAID_AUTOCONFIG
! 81: #define RF_ENGINE_PID 10
! 82: void rf_DAGExecutionThread_pre(RF_ThreadArg_t);
! 83: extern pid_t lastpid;
! 84: #endif /* RAID_AUTOCONFIG */
! 85: void **rf_hook_cookies;
! 86: extern int numraid;
! 87:
! 88: #define DO_INIT(_l_,_r_) \
! 89: do { \
! 90: int _rc; \
! 91: _rc = rf_create_managed_mutex(_l_, &(_r_)->node_queue_mutex); \
! 92: if (_rc) { \
! 93: return(_rc); \
! 94: } \
! 95: _rc = rf_create_managed_cond(_l_, &(_r_)->node_queue_cond); \
! 96: if (_rc) { \
! 97: return(_rc); \
! 98: } \
! 99: } while (0)
! 100:
! 101: /*
! 102: * Synchronization primitives for this file. DO_WAIT should be enclosed
! 103: * in a while loop.
! 104: */
! 105:
! 106: /*
! 107: * XXX Is this spl-ing really necessary ?
! 108: */
! 109: #define DO_LOCK(_r_) \
! 110: do { \
! 111: ks = splbio(); \
! 112: RF_LOCK_MUTEX((_r_)->node_queue_mutex); \
! 113: } while (0)
! 114:
! 115: #define DO_UNLOCK(_r_) \
! 116: do { \
! 117: RF_UNLOCK_MUTEX((_r_)->node_queue_mutex); \
! 118: splx(ks); \
! 119: } while (0)
! 120:
! 121: #define DO_WAIT(_r_) \
! 122: RF_WAIT_COND((_r_)->node_queue, (_r_)->node_queue_mutex)
! 123:
! 124: /* XXX RF_SIGNAL_COND? */
! 125: #define DO_SIGNAL(_r_) \
! 126: RF_BROADCAST_COND((_r_)->node_queue)
! 127:
! 128: void rf_ShutdownEngine(void *);
! 129:
! 130: void
! 131: rf_ShutdownEngine(void *arg)
! 132: {
! 133: RF_Raid_t *raidPtr;
! 134:
! 135: raidPtr = (RF_Raid_t *) arg;
! 136: raidPtr->shutdown_engine = 1;
! 137: DO_SIGNAL(raidPtr);
! 138: }
! 139:
! 140: int
! 141: rf_ConfigureEngine(RF_ShutdownList_t **listp, RF_Raid_t *raidPtr,
! 142: RF_Config_t *cfgPtr)
! 143: {
! 144: int rc;
! 145: char raidname[16];
! 146:
! 147: DO_INIT(listp, raidPtr);
! 148:
! 149: raidPtr->node_queue = NULL;
! 150: raidPtr->dags_in_flight = 0;
! 151:
! 152: rc = rf_init_managed_threadgroup(listp, &raidPtr->engine_tg);
! 153: if (rc)
! 154: return (rc);
! 155:
! 156: /*
! 157: * We create the execution thread only once per system boot. No need
! 158: * to check return code b/c the kernel panics if it can't create the
! 159: * thread.
! 160: */
! 161: if (rf_engineDebug) {
! 162: printf("raid%d: %s engine thread\n", raidPtr->raidid,
! 163: (initproc)?"Starting":"Creating");
! 164: }
! 165: if (rf_hook_cookies == NULL) {
! 166: rf_hook_cookies =
! 167: malloc(numraid * sizeof(void *),
! 168: M_RAIDFRAME, M_NOWAIT);
! 169: if (rf_hook_cookies == NULL)
! 170: return (ENOMEM);
! 171: bzero(rf_hook_cookies, numraid * sizeof(void *));
! 172: }
! 173: #ifdef RAID_AUTOCONFIG
! 174: if (initproc == NULL) {
! 175: rf_hook_cookies[raidPtr->raidid] =
! 176: startuphook_establish(rf_DAGExecutionThread_pre,
! 177: raidPtr);
! 178: } else {
! 179: #endif /* RAID_AUTOCONFIG */
! 180: snprintf(&raidname[0], 16, "raid%d", raidPtr->raidid);
! 181: if (RF_CREATE_THREAD(raidPtr->engine_thread,
! 182: rf_DAGExecutionThread, raidPtr, &raidname[0])) {
! 183: RF_ERRORMSG("RAIDFRAME: Unable to start engine"
! 184: " thread\n");
! 185: return (ENOMEM);
! 186: }
! 187: if (rf_engineDebug) {
! 188: printf("raid%d: Engine thread started\n",
! 189: raidPtr->raidid);
! 190: }
! 191: RF_THREADGROUP_STARTED(&raidPtr->engine_tg);
! 192: #ifdef RAID_AUTOCONFIG
! 193: }
! 194: #endif
! 195: /* XXX Something is missing here... */
! 196: #ifdef debug
! 197: printf("Skipping the WAIT_START !!!\n");
! 198: #endif
! 199: /* Engine thread is now running and waiting for work. */
! 200: if (rf_engineDebug) {
! 201: printf("raid%d: Engine thread running and waiting for events\n",
! 202: raidPtr->raidid);
! 203: }
! 204: rc = rf_ShutdownCreate(listp, rf_ShutdownEngine, raidPtr);
! 205: if (rc) {
! 206: RF_ERRORMSG3("Unable to add to shutdown list file %s line %d"
! 207: " rc=%d\n", __FILE__, __LINE__, rc);
! 208: rf_ShutdownEngine(NULL);
! 209: }
! 210: return (rc);
! 211: }
! 212:
! 213: int
! 214: rf_BranchDone(RF_DagNode_t *node)
! 215: {
! 216: int i;
! 217:
! 218: /*
! 219: * Return true if forward execution is completed for a node and it's
! 220: * succedents.
! 221: */
! 222: switch (node->status) {
! 223: case rf_wait:
! 224: /* Should never be called in this state. */
! 225: RF_PANIC();
! 226: break;
! 227: case rf_fired:
! 228: /* Node is currently executing, so we're not done. */
! 229: return (RF_FALSE);
! 230: case rf_good:
! 231: /* For each succedent. */
! 232: for (i = 0; i < node->numSuccedents; i++)
! 233: /* Recursively check branch. */
! 234: if (!rf_BranchDone(node->succedents[i]))
! 235: return RF_FALSE;
! 236:
! 237: return RF_TRUE; /*
! 238: * Node and all succedent branches aren't in
! 239: * fired state.
! 240: */
! 241: break;
! 242: case rf_bad:
! 243: /* Succedents can't fire. */
! 244: return (RF_TRUE);
! 245: case rf_recover:
! 246: /* Should never be called in this state. */
! 247: RF_PANIC();
! 248: break;
! 249: case rf_undone:
! 250: case rf_panic:
! 251: /* XXX Need to fix this case. */
! 252: /* For now, assume that we're done. */
! 253: return (RF_TRUE);
! 254: break;
! 255: default:
! 256: /* Illegal node status. */
! 257: RF_PANIC();
! 258: break;
! 259: }
! 260: }
! 261:
! 262: int
! 263: rf_NodeReady(RF_DagNode_t *node)
! 264: {
! 265: int ready;
! 266:
! 267: switch (node->dagHdr->status) {
! 268: case rf_enable:
! 269: case rf_rollForward:
! 270: if ((node->status == rf_wait) &&
! 271: (node->numAntecedents == node->numAntDone))
! 272: ready = RF_TRUE;
! 273: else
! 274: ready = RF_FALSE;
! 275: break;
! 276: case rf_rollBackward:
! 277: RF_ASSERT(node->numSuccDone <= node->numSuccedents);
! 278: RF_ASSERT(node->numSuccFired <= node->numSuccedents);
! 279: RF_ASSERT(node->numSuccFired <= node->numSuccDone);
! 280: if ((node->status == rf_good) &&
! 281: (node->numSuccDone == node->numSuccedents))
! 282: ready = RF_TRUE;
! 283: else
! 284: ready = RF_FALSE;
! 285: break;
! 286: default:
! 287: printf("Execution engine found illegal DAG status"
! 288: " in rf_NodeReady\n");
! 289: RF_PANIC();
! 290: break;
! 291: }
! 292:
! 293: return (ready);
! 294: }
! 295:
! 296:
! 297: /*
! 298: * User context and dag-exec-thread context:
! 299: * Fire a node. The node's status field determines which function, do or undo,
! 300: * to be fired.
! 301: * This routine assumes that the node's status field has alread been set to
! 302: * "fired" or "recover" to indicate the direction of execution.
! 303: */
! 304: void
! 305: rf_FireNode(RF_DagNode_t *node)
! 306: {
! 307: switch (node->status) {
! 308: case rf_fired:
! 309: /* Fire the do function of a node. */
! 310: if (rf_engineDebug>1) {
! 311: printf("raid%d: Firing node 0x%lx (%s)\n",
! 312: node->dagHdr->raidPtr->raidid,
! 313: (unsigned long) node, node->name);
! 314: }
! 315: if (node->flags & RF_DAGNODE_FLAG_YIELD) {
! 316: #if (defined(__NetBSD__) || defined(__OpenBSD__)) && defined(_KERNEL)
! 317: /* thread_block(); */
! 318: /* printf("Need to block the thread here...\n"); */
! 319: /*
! 320: * XXX thread_block is actually mentioned in
! 321: * /usr/include/vm/vm_extern.h
! 322: */
! 323: #else
! 324: thread_block();
! 325: #endif
! 326: }
! 327: (*(node->doFunc)) (node);
! 328: break;
! 329: case rf_recover:
! 330: /* Fire the undo function of a node. */
! 331: if (rf_engineDebug>1) {
! 332: printf("raid%d: Firing (undo) node 0x%lx (%s)\n",
! 333: node->dagHdr->raidPtr->raidid,
! 334: (unsigned long) node, node->name);
! 335: }
! 336: if (node->flags & RF_DAGNODE_FLAG_YIELD) {
! 337: #if (defined(__NetBSD__) || defined(__OpenBSD__)) && defined(_KERNEL)
! 338: /* thread_block(); */
! 339: /* printf("Need to block the thread here...\n"); */
! 340: /*
! 341: * XXX thread_block is actually mentioned in
! 342: * /usr/include/vm/vm_extern.h
! 343: */
! 344: #else
! 345: thread_block();
! 346: #endif
! 347: }
! 348: (*(node->undoFunc)) (node);
! 349: break;
! 350: default:
! 351: RF_PANIC();
! 352: break;
! 353: }
! 354: }
! 355:
! 356:
! 357: /*
! 358: * User context:
! 359: * Attempt to fire each node in a linear array.
! 360: * The entire list is fired atomically.
! 361: */
! 362: void
! 363: rf_FireNodeArray(int numNodes, RF_DagNode_t **nodeList)
! 364: {
! 365: RF_DagStatus_t dstat;
! 366: RF_DagNode_t *node;
! 367: int i, j;
! 368:
! 369: /* First, mark all nodes which are ready to be fired. */
! 370: for (i = 0; i < numNodes; i++) {
! 371: node = nodeList[i];
! 372: dstat = node->dagHdr->status;
! 373: RF_ASSERT((node->status == rf_wait) ||
! 374: (node->status == rf_good));
! 375: if (rf_NodeReady(node)) {
! 376: if ((dstat == rf_enable) || (dstat == rf_rollForward)) {
! 377: RF_ASSERT(node->status == rf_wait);
! 378: if (node->commitNode)
! 379: node->dagHdr->numCommits++;
! 380: node->status = rf_fired;
! 381: for (j = 0; j < node->numAntecedents; j++)
! 382: node->antecedents[j]->numSuccFired++;
! 383: } else {
! 384: RF_ASSERT(dstat == rf_rollBackward);
! 385: RF_ASSERT(node->status == rf_good);
! 386: /* Only one commit node per graph. */
! 387: RF_ASSERT(node->commitNode == RF_FALSE);
! 388: node->status = rf_recover;
! 389: }
! 390: }
! 391: }
! 392: /* Now, fire the nodes. */
! 393: for (i = 0; i < numNodes; i++) {
! 394: if ((nodeList[i]->status == rf_fired) ||
! 395: (nodeList[i]->status == rf_recover))
! 396: rf_FireNode(nodeList[i]);
! 397: }
! 398: }
! 399:
! 400:
! 401: /*
! 402: * User context:
! 403: * Attempt to fire each node in a linked list.
! 404: * The entire list is fired atomically.
! 405: */
! 406: void
! 407: rf_FireNodeList(RF_DagNode_t *nodeList)
! 408: {
! 409: RF_DagNode_t *node, *next;
! 410: RF_DagStatus_t dstat;
! 411: int j;
! 412:
! 413: if (nodeList) {
! 414: /* First, mark all nodes which are ready to be fired. */
! 415: for (node = nodeList; node; node = next) {
! 416: next = node->next;
! 417: dstat = node->dagHdr->status;
! 418: RF_ASSERT((node->status == rf_wait) ||
! 419: (node->status == rf_good));
! 420: if (rf_NodeReady(node)) {
! 421: if ((dstat == rf_enable) ||
! 422: (dstat == rf_rollForward)) {
! 423: RF_ASSERT(node->status == rf_wait);
! 424: if (node->commitNode)
! 425: node->dagHdr->numCommits++;
! 426: node->status = rf_fired;
! 427: for (j = 0; j < node->numAntecedents;
! 428: j++)
! 429: node->antecedents[j]
! 430: ->numSuccFired++;
! 431: } else {
! 432: RF_ASSERT(dstat == rf_rollBackward);
! 433: RF_ASSERT(node->status == rf_good);
! 434: /* Only one commit node per graph. */
! 435: RF_ASSERT(node->commitNode == RF_FALSE);
! 436: node->status = rf_recover;
! 437: }
! 438: }
! 439: }
! 440: /* Now, fire the nodes. */
! 441: for (node = nodeList; node; node = next) {
! 442: next = node->next;
! 443: if ((node->status == rf_fired) ||
! 444: (node->status == rf_recover))
! 445: rf_FireNode(node);
! 446: }
! 447: }
! 448: }
! 449:
! 450:
! 451: /*
! 452: * Interrupt context:
! 453: * For each succedent,
! 454: * propagate required results from node to succedent.
! 455: * increment succedent's numAntDone.
! 456: * place newly-enable nodes on node queue for firing.
! 457: *
! 458: * To save context switches, we don't place NIL nodes on the node queue,
! 459: * but rather just process them as if they had fired. Note that NIL nodes
! 460: * that are the direct successors of the header will actually get fired by
! 461: * DispatchDAG, which is fine because no context switches are involved.
! 462: *
! 463: * Important: when running at user level, this can be called by any
! 464: * disk thread, and so the increment and check of the antecedent count
! 465: * must be locked. I used the node queue mutex and locked down the
! 466: * entire function, but this is certainly overkill.
! 467: */
! 468: void
! 469: rf_PropagateResults(RF_DagNode_t *node, int context)
! 470: {
! 471: RF_DagNode_t *s, *a;
! 472: RF_Raid_t *raidPtr;
! 473: int i, ks;
! 474: /* A list of NIL nodes to be finished. */
! 475: RF_DagNode_t *finishlist = NULL;
! 476: /* List of nodes with failed truedata antecedents. */
! 477: RF_DagNode_t *skiplist = NULL;
! 478: RF_DagNode_t *firelist = NULL; /* A list of nodes to be fired. */
! 479: RF_DagNode_t *q = NULL, *qh = NULL, *next;
! 480: int j, skipNode;
! 481:
! 482: raidPtr = node->dagHdr->raidPtr;
! 483:
! 484: DO_LOCK(raidPtr);
! 485:
! 486: /* Debug - validate fire counts. */
! 487: for (i = 0; i < node->numAntecedents; i++) {
! 488: a = *(node->antecedents + i);
! 489: RF_ASSERT(a->numSuccFired >= a->numSuccDone);
! 490: RF_ASSERT(a->numSuccFired <= a->numSuccedents);
! 491: a->numSuccDone++;
! 492: }
! 493:
! 494: switch (node->dagHdr->status) {
! 495: case rf_enable:
! 496: case rf_rollForward:
! 497: for (i = 0; i < node->numSuccedents; i++) {
! 498: s = *(node->succedents + i);
! 499: RF_ASSERT(s->status == rf_wait);
! 500: (s->numAntDone)++;
! 501: if (s->numAntDone == s->numAntecedents) {
! 502: /* Look for NIL nodes. */
! 503: if (s->doFunc == rf_NullNodeFunc) {
! 504: /*
! 505: * Don't fire NIL nodes, just process
! 506: * them.
! 507: */
! 508: s->next = finishlist;
! 509: finishlist = s;
! 510: } else {
! 511: /*
! 512: * Look to see if the node is to be
! 513: * skipped.
! 514: */
! 515: skipNode = RF_FALSE;
! 516: for (j = 0; j < s->numAntecedents; j++)
! 517: if ((s->antType[j] ==
! 518: rf_trueData) &&
! 519: (s->antecedents[j]->status
! 520: == rf_bad))
! 521: skipNode = RF_TRUE;
! 522: if (skipNode) {
! 523: /*
! 524: * This node has one or more
! 525: * failed true data
! 526: * dependencies, so skip it.
! 527: */
! 528: s->next = skiplist;
! 529: skiplist = s;
! 530: } else {
! 531: /*
! 532: * Add s to list of nodes (q)
! 533: * to execute.
! 534: */
! 535: if (context != RF_INTR_CONTEXT)
! 536: {
! 537: /*
! 538: * We only have to
! 539: * enqueue if we're at
! 540: * intr context.
! 541: */
! 542: /*
! 543: * Put node on a list to
! 544: * be fired after we
! 545: * unlock.
! 546: */
! 547: s->next = firelist;
! 548: firelist = s;
! 549: } else {
! 550: /*
! 551: * Enqueue the node for
! 552: * the dag exec thread
! 553: * to fire.
! 554: */
! 555: RF_ASSERT(rf_NodeReady(s));
! 556: if (q) {
! 557: q->next = s;
! 558: q = s;
! 559: } else {
! 560: qh = q = s;
! 561: qh->next = NULL;
! 562: }
! 563: }
! 564: }
! 565: }
! 566: }
! 567: }
! 568:
! 569: if (q) {
! 570: /*
! 571: * Transfer our local list of nodes to the node
! 572: * queue.
! 573: */
! 574: q->next = raidPtr->node_queue;
! 575: raidPtr->node_queue = qh;
! 576: DO_SIGNAL(raidPtr);
! 577: }
! 578: DO_UNLOCK(raidPtr);
! 579:
! 580: for (; skiplist; skiplist = next) {
! 581: next = skiplist->next;
! 582: skiplist->status = rf_skipped;
! 583: for (i = 0; i < skiplist->numAntecedents; i++) {
! 584: skiplist->antecedents[i]->numSuccFired++;
! 585: }
! 586: if (skiplist->commitNode) {
! 587: skiplist->dagHdr->numCommits++;
! 588: }
! 589: rf_FinishNode(skiplist, context);
! 590: }
! 591: for (; finishlist; finishlist = next) {
! 592: /* NIL nodes: no need to fire them. */
! 593: next = finishlist->next;
! 594: finishlist->status = rf_good;
! 595: for (i = 0; i < finishlist->numAntecedents; i++) {
! 596: finishlist->antecedents[i]->numSuccFired++;
! 597: }
! 598: if (finishlist->commitNode)
! 599: finishlist->dagHdr->numCommits++;
! 600: /*
! 601: * Okay, here we're calling rf_FinishNode() on nodes
! 602: * that have the null function as their work proc.
! 603: * Such a node could be the terminal node in a DAG.
! 604: * If so, it will cause the DAG to complete, which will
! 605: * in turn free memory used by the DAG, which includes
! 606: * the node in question.
! 607: * Thus, we must avoid referencing the node at all
! 608: * after calling rf_FinishNode() on it.
! 609: */
! 610: /* Recursive call. */
! 611: rf_FinishNode(finishlist, context);
! 612: }
! 613: /* Fire all nodes in firelist. */
! 614: rf_FireNodeList(firelist);
! 615: break;
! 616:
! 617: case rf_rollBackward:
! 618: for (i = 0; i < node->numAntecedents; i++) {
! 619: a = *(node->antecedents + i);
! 620: RF_ASSERT(a->status == rf_good);
! 621: RF_ASSERT(a->numSuccDone <= a->numSuccedents);
! 622: RF_ASSERT(a->numSuccDone <= a->numSuccFired);
! 623:
! 624: if (a->numSuccDone == a->numSuccFired) {
! 625: if (a->undoFunc == rf_NullNodeFunc) {
! 626: /*
! 627: * Don't fire NIL nodes, just process
! 628: * them.
! 629: */
! 630: a->next = finishlist;
! 631: finishlist = a;
! 632: } else {
! 633: if (context != RF_INTR_CONTEXT) {
! 634: /*
! 635: * We only have to enqueue if
! 636: * we're at intr context.
! 637: */
! 638: /*
! 639: * Put node on a list to
! 640: * be fired after we
! 641: * unlock.
! 642: */
! 643: a->next = firelist;
! 644: firelist = a;
! 645: } else {
! 646: /*
! 647: * Enqueue the node for
! 648: * the dag exec thread
! 649: * to fire.
! 650: */
! 651: RF_ASSERT(rf_NodeReady(a));
! 652: if (q) {
! 653: q->next = a;
! 654: q = a;
! 655: } else {
! 656: qh = q = a;
! 657: qh->next = NULL;
! 658: }
! 659: }
! 660: }
! 661: }
! 662: }
! 663: if (q) {
! 664: /*
! 665: * Transfer our local list of nodes to the node
! 666: * queue.
! 667: */
! 668: q->next = raidPtr->node_queue;
! 669: raidPtr->node_queue = qh;
! 670: DO_SIGNAL(raidPtr);
! 671: }
! 672: DO_UNLOCK(raidPtr);
! 673: for (; finishlist; finishlist = next) {
! 674: /* NIL nodes: no need to fire them. */
! 675: next = finishlist->next;
! 676: finishlist->status = rf_good;
! 677: /*
! 678: * Okay, here we're calling rf_FinishNode() on nodes
! 679: * that have the null function as their work proc.
! 680: * Such a node could be the first node in a DAG.
! 681: * If so, it will cause the DAG to complete, which will
! 682: * in turn free memory used by the DAG, which includes
! 683: * the node in question.
! 684: * Thus, we must avoid referencing the node at all
! 685: * after calling rf_FinishNode() on it.
! 686: */
! 687: rf_FinishNode(finishlist, context);
! 688: /* Recursive call. */
! 689: }
! 690: /* Fire all nodes in firelist. */
! 691: rf_FireNodeList(firelist);
! 692:
! 693: break;
! 694: default:
! 695: printf("Engine found illegal DAG status in"
! 696: " rf_PropagateResults()\n");
! 697: RF_PANIC();
! 698: break;
! 699: }
! 700: }
! 701:
! 702:
! 703: /*
! 704: * Process a fired node which has completed.
! 705: */
! 706: void
! 707: rf_ProcessNode(RF_DagNode_t *node, int context)
! 708: {
! 709: RF_Raid_t *raidPtr;
! 710:
! 711: raidPtr = node->dagHdr->raidPtr;
! 712:
! 713: switch (node->status) {
! 714: case rf_good:
! 715: /* Normal case, don't need to do anything. */
! 716: break;
! 717: case rf_bad:
! 718: if ((node->dagHdr->numCommits > 0) ||
! 719: (node->dagHdr->numCommitNodes == 0)) {
! 720: /* Crossed commit barrier. */
! 721: node->dagHdr->status = rf_rollForward;
! 722: if (rf_engineDebug || 1) {
! 723: printf("raid%d: node (%s) returned fail,"
! 724: " rolling forward\n", raidPtr->raidid,
! 725: node->name);
! 726: }
! 727: } else {
! 728: /* Never reached commit barrier. */
! 729: node->dagHdr->status = rf_rollBackward;
! 730: if (rf_engineDebug || 1) {
! 731: printf("raid%d: node (%s) returned fail,"
! 732: " rolling backward\n", raidPtr->raidid,
! 733: node->name);
! 734: }
! 735: }
! 736: break;
! 737: case rf_undone:
! 738: /* Normal rollBackward case, don't need to do anything. */
! 739: break;
! 740: case rf_panic:
! 741: /* An undo node failed !!! */
! 742: printf("UNDO of a node failed !!!/n");
! 743: break;
! 744: default:
! 745: printf("node finished execution with an illegal status !!!\n");
! 746: RF_PANIC();
! 747: break;
! 748: }
! 749:
! 750: /*
! 751: * Enqueue node's succedents (antecedents if rollBackward) for
! 752: * execution.
! 753: */
! 754: rf_PropagateResults(node, context);
! 755: }
! 756:
! 757:
! 758: /*
! 759: * User context or dag-exec-thread context:
! 760: * This is the first step in post-processing a newly-completed node.
! 761: * This routine is called by each node execution function to mark the node
! 762: * as complete and fire off any successors that have been enabled.
! 763: */
! 764: int
! 765: rf_FinishNode(RF_DagNode_t *node, int context)
! 766: {
! 767: /* As far as I can tell, retcode is not used -wvcii. */
! 768: int retcode = RF_FALSE;
! 769: node->dagHdr->numNodesCompleted++;
! 770: rf_ProcessNode(node, context);
! 771:
! 772: return (retcode);
! 773: }
! 774:
! 775:
! 776: /*
! 777: * User context:
! 778: * Submit dag for execution, return non-zero if we have to wait for completion.
! 779: * If and only if we return non-zero, we'll cause cbFunc to get invoked with
! 780: * cbArg when the DAG has completed.
! 781: *
! 782: * For now we always return 1. If the DAG does not cause any I/O, then the
! 783: * callback may get invoked before DispatchDAG returns. There's code in state
! 784: * 5 of ContinueRaidAccess to handle this.
! 785: *
! 786: * All we do here is fire the direct successors of the header node. The DAG
! 787: * execution thread does the rest of the dag processing.
! 788: */
! 789: int
! 790: rf_DispatchDAG(RF_DagHeader_t *dag, void (*cbFunc) (void *), void *cbArg)
! 791: {
! 792: RF_Raid_t *raidPtr;
! 793:
! 794: raidPtr = dag->raidPtr;
! 795: if (dag->tracerec) {
! 796: RF_ETIMER_START(dag->tracerec->timer);
! 797: }
! 798: if (rf_engineDebug || rf_validateDAGDebug) {
! 799: if (rf_ValidateDAG(dag))
! 800: RF_PANIC();
! 801: }
! 802: if (rf_engineDebug>1) {
! 803: printf("raid%d: Entering DispatchDAG\n", raidPtr->raidid);
! 804: }
! 805: raidPtr->dags_in_flight++; /*
! 806: * Debug only: blow off proper
! 807: * locking.
! 808: */
! 809: dag->cbFunc = cbFunc;
! 810: dag->cbArg = cbArg;
! 811: dag->numNodesCompleted = 0;
! 812: dag->status = rf_enable;
! 813: rf_FireNodeArray(dag->numSuccedents, dag->succedents);
! 814: return (1);
! 815: }
! 816:
! 817:
! 818: /*
! 819: * Dedicated kernel thread:
! 820: * The thread that handles all DAG node firing.
! 821: * To minimize locking and unlocking, we grab a copy of the entire node queue
! 822: * and then set the node queue to NULL before doing any firing of nodes.
! 823: * This way we only have to release the lock once. Of course, it's probably
! 824: * rare that there's more than one node in the queue at any one time, but it
! 825: * sometimes happens.
! 826: *
! 827: * In the kernel, this thread runs at spl0 and is not swappable. I copied these
! 828: * characteristics from the aio_completion_thread.
! 829: */
! 830:
! 831: #ifdef RAID_AUTOCONFIG
! 832: void
! 833: rf_DAGExecutionThread_pre(RF_ThreadArg_t arg)
! 834: {
! 835: RF_Raid_t *raidPtr;
! 836: char raidname[16];
! 837: pid_t oldpid = lastpid;
! 838:
! 839: raidPtr = (RF_Raid_t *) arg;
! 840:
! 841: if (rf_engineDebug) {
! 842: printf("raid%d: Starting engine thread\n", raidPtr->raidid);
! 843: }
! 844:
! 845: lastpid = RF_ENGINE_PID + raidPtr->raidid - 1;
! 846: snprintf(raidname, sizeof raidname, "raid%d", raidPtr->raidid);
! 847:
! 848: if (RF_CREATE_THREAD(raidPtr->engine_thread, rf_DAGExecutionThread,
! 849: raidPtr, &raidname[0])) {
! 850: RF_ERRORMSG("RAIDFRAME: Unable to start engine thread\n");
! 851: return;
! 852: }
! 853:
! 854: lastpid = oldpid;
! 855: if (rf_engineDebug) {
! 856: printf("raid%d: Engine thread started\n", raidPtr->raidid);
! 857: }
! 858: RF_THREADGROUP_STARTED(&raidPtr->engine_tg);
! 859: }
! 860: #endif /* RAID_AUTOCONFIG */
! 861:
! 862: void
! 863: rf_DAGExecutionThread(RF_ThreadArg_t arg)
! 864: {
! 865: RF_DagNode_t *nd, *local_nq, *term_nq, *fire_nq;
! 866: RF_Raid_t *raidPtr;
! 867: int ks;
! 868: int s;
! 869:
! 870: raidPtr = (RF_Raid_t *) arg;
! 871:
! 872: while (!(&raidPtr->engine_tg)->created)
! 873: (void) tsleep((void *)&(&raidPtr->engine_tg)->created, PWAIT,
! 874: "raidinit", 0);
! 875:
! 876: if (rf_engineDebug) {
! 877: printf("raid%d: Engine thread is running\n", raidPtr->raidid);
! 878: }
! 879: /* XXX What to put here ? XXX */
! 880:
! 881: s = splbio();
! 882:
! 883: RF_THREADGROUP_RUNNING(&raidPtr->engine_tg);
! 884:
! 885: rf_hook_cookies[raidPtr->raidid] =
! 886: shutdownhook_establish(rf_shutdown_hook, (void *)raidPtr);
! 887:
! 888: DO_LOCK(raidPtr);
! 889: while (!raidPtr->shutdown_engine) {
! 890:
! 891: while (raidPtr->node_queue != NULL) {
! 892: local_nq = raidPtr->node_queue;
! 893: fire_nq = NULL;
! 894: term_nq = NULL;
! 895: raidPtr->node_queue = NULL;
! 896: DO_UNLOCK(raidPtr);
! 897:
! 898: /* First, strip out the terminal nodes. */
! 899: while (local_nq) {
! 900: nd = local_nq;
! 901: local_nq = local_nq->next;
! 902: switch (nd->dagHdr->status) {
! 903: case rf_enable:
! 904: case rf_rollForward:
! 905: if (nd->numSuccedents == 0) {
! 906: /*
! 907: * End of the dag, add to
! 908: * callback list.
! 909: */
! 910: nd->next = term_nq;
! 911: term_nq = nd;
! 912: } else {
! 913: /*
! 914: * Not the end, add to the
! 915: * fire queue.
! 916: */
! 917: nd->next = fire_nq;
! 918: fire_nq = nd;
! 919: }
! 920: break;
! 921: case rf_rollBackward:
! 922: if (nd->numAntecedents == 0) {
! 923: /*
! 924: * End of the dag, add to the
! 925: * callback list.
! 926: */
! 927: nd->next = term_nq;
! 928: term_nq = nd;
! 929: } else {
! 930: /*
! 931: * Not the end, add to the
! 932: * fire queue.
! 933: */
! 934: nd->next = fire_nq;
! 935: fire_nq = nd;
! 936: }
! 937: break;
! 938: default:
! 939: RF_PANIC();
! 940: break;
! 941: }
! 942: }
! 943:
! 944: /*
! 945: * Execute callback of dags which have reached the
! 946: * terminal node.
! 947: */
! 948: while (term_nq) {
! 949: nd = term_nq;
! 950: term_nq = term_nq->next;
! 951: nd->next = NULL;
! 952: (nd->dagHdr->cbFunc) (nd->dagHdr->cbArg);
! 953: raidPtr->dags_in_flight--; /* Debug only. */
! 954: }
! 955:
! 956: /* Fire remaining nodes. */
! 957: rf_FireNodeList(fire_nq);
! 958:
! 959: DO_LOCK(raidPtr);
! 960: }
! 961: while (!raidPtr->shutdown_engine && raidPtr->node_queue == NULL)
! 962: DO_WAIT(raidPtr);
! 963: }
! 964: DO_UNLOCK(raidPtr);
! 965:
! 966: if (rf_hook_cookies && rf_hook_cookies[raidPtr->raidid] != NULL) {
! 967: shutdownhook_disestablish(rf_hook_cookies[raidPtr->raidid]);
! 968: rf_hook_cookies[raidPtr->raidid] = NULL;
! 969: }
! 970:
! 971: RF_THREADGROUP_DONE(&raidPtr->engine_tg);
! 972:
! 973: splx(s);
! 974: kthread_exit(0);
! 975: }
CVSweb