Annotation of sys/dev/raidframe/rf_dagffwr.c, Revision 1.1.1.1
1.1 nbrk 1: /* $OpenBSD: rf_dagffwr.c,v 1.5 2002/12/16 07:01:03 tdeval Exp $ */
2: /* $NetBSD: rf_dagffwr.c,v 1.5 2000/01/07 03:40:58 oster Exp $ */
3:
4: /*
5: * Copyright (c) 1995 Carnegie-Mellon University.
6: * All rights reserved.
7: *
8: * Author: Mark Holland, Daniel Stodolsky, William V. Courtright II
9: *
10: * Permission to use, copy, modify and distribute this software and
11: * its documentation is hereby granted, provided that both the copyright
12: * notice and this permission notice appear in all copies of the
13: * software, derivative works or modified versions, and any portions
14: * thereof, and that both notices appear in supporting documentation.
15: *
16: * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
17: * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND
18: * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
19: *
20: * Carnegie Mellon requests users of this software to return to
21: *
22: * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU
23: * School of Computer Science
24: * Carnegie Mellon University
25: * Pittsburgh PA 15213-3890
26: *
27: * any improvements or extensions that they make and grant Carnegie the
28: * rights to redistribute these changes.
29: */
30:
31: /*
32: * rf_dagff.c
33: *
34: * Code for creating fault-free DAGs.
35: *
36: */
37:
38: #include "rf_types.h"
39: #include "rf_raid.h"
40: #include "rf_dag.h"
41: #include "rf_dagutils.h"
42: #include "rf_dagfuncs.h"
43: #include "rf_debugMem.h"
44: #include "rf_dagffrd.h"
45: #include "rf_memchunk.h"
46: #include "rf_general.h"
47: #include "rf_dagffwr.h"
48:
49: /*****************************************************************************
50: *
51: * General comments on DAG creation:
52: *
53: * All DAGs in this file use roll-away error recovery. Each DAG has a single
54: * commit node, usually called "Cmt." If an error occurs before the Cmt node
55: * is reached, the execution engine will halt forward execution and work
56: * backward through the graph, executing the undo functions. Assuming that
57: * each node in the graph prior to the Cmt node are undoable and atomic - or -
58: * does not make changes to permanent state, the graph will fail atomically.
59: * If an error occurs after the Cmt node executes, the engine will roll-forward
60: * through the graph, blindly executing nodes until it reaches the end.
61: * If a graph reaches the end, it is assumed to have completed successfully.
62: *
63: * A graph has only 1 Cmt node.
64: *
65: *****************************************************************************/
66:
67:
68: /*****************************************************************************
69: *
70: * The following wrappers map the standard DAG creation interface to the
71: * DAG creation routines. Additionally, these wrappers enable experimentation
72: * with new DAG structures by providing an extra level of indirection, allowing
73: * the DAG creation routines to be replaced at this single point.
74: *
75: *****************************************************************************/
76:
77:
78: void
79: rf_CreateNonRedundantWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
80: RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
81: RF_AllocListElem_t *allocList, RF_IoType_t type)
82: {
83: rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
84: RF_IO_TYPE_WRITE);
85: }
86:
87: void
88: rf_CreateRAID0WriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
89: RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
90: RF_AllocListElem_t *allocList, RF_IoType_t type)
91: {
92: rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
93: RF_IO_TYPE_WRITE);
94: }
95:
96: void
97: rf_CreateSmallWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
98: RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
99: RF_AllocListElem_t *allocList)
100: {
101: /* "normal" rollaway. */
102: rf_CommonCreateSmallWriteDAG(raidPtr, asmap, dag_h, bp, flags,
103: allocList, &rf_xorFuncs, NULL);
104: }
105:
106: void
107: rf_CreateLargeWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
108: RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
109: RF_AllocListElem_t *allocList)
110: {
111: /* "normal" rollaway. */
112: rf_CommonCreateLargeWriteDAG(raidPtr, asmap, dag_h, bp, flags,
113: allocList, 1, rf_RegularXorFunc, RF_TRUE);
114: }
115:
116:
117: /*****************************************************************************
118: *
119: * DAG creation code begins here.
120: *
121: *****************************************************************************/
122:
123:
124: /*****************************************************************************
125: *
126: * creates a DAG to perform a large-write operation:
127: *
128: * / Rod \ / Wnd \
129: * H -- block- Rod - Xor - Cmt - Wnd --- T
130: * \ Rod / \ Wnp /
131: * \[Wnq]/
132: *
133: * The XOR node also does the Q calculation in the P+Q architecture.
134: * All nodes that are before the commit node (Cmt) are assumed to be atomic
135: * and undoable - or - they make no changes to permanent state.
136: *
137: * Rod = read old data
138: * Cmt = commit node
139: * Wnp = write new parity
140: * Wnd = write new data
141: * Wnq = write new "q"
142: * [] denotes optional segments in the graph.
143: *
144: * Parameters: raidPtr - description of the physical array
145: * asmap - logical & physical addresses for this access
146: * bp - buffer ptr (holds write data)
147: * flags - general flags (e.g. disk locking)
148: * allocList - list of memory allocated in DAG creation
149: * nfaults - number of faults array can tolerate
150: * (equal to # redundancy units in stripe)
151: * redfuncs - list of redundancy generating functions
152: *
153: *****************************************************************************/
154:
155: void
156: rf_CommonCreateLargeWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
157: RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
158: RF_AllocListElem_t *allocList, int nfaults, int (*redFunc) (RF_DagNode_t *),
159: int allowBufferRecycle)
160: {
161: RF_DagNode_t *nodes, *wndNodes, *rodNodes, *xorNode, *wnpNode;
162: RF_DagNode_t *wnqNode, *blockNode, *commitNode, *termNode;
163: int nWndNodes, nRodNodes, i, nodeNum, asmNum;
164: RF_AccessStripeMapHeader_t *new_asm_h[2];
165: RF_StripeNum_t parityStripeID;
166: char *sosBuffer, *eosBuffer;
167: RF_ReconUnitNum_t which_ru;
168: RF_RaidLayout_t *layoutPtr;
169: RF_PhysDiskAddr_t *pda;
170:
171: layoutPtr = &(raidPtr->Layout);
172: parityStripeID = rf_RaidAddressToParityStripeID(layoutPtr,
173: asmap->raidAddress, &which_ru);
174:
175: if (rf_dagDebug) {
176: printf("[Creating large-write DAG]\n");
177: }
178: dag_h->creator = "LargeWriteDAG";
179:
180: dag_h->numCommitNodes = 1;
181: dag_h->numCommits = 0;
182: dag_h->numSuccedents = 1;
183:
184: /* Alloc the nodes: Wnd, xor, commit, block, term, and Wnp. */
185: nWndNodes = asmap->numStripeUnitsAccessed;
186: RF_CallocAndAdd(nodes, nWndNodes + 4 + nfaults, sizeof(RF_DagNode_t),
187: (RF_DagNode_t *), allocList);
188: i = 0;
189: wndNodes = &nodes[i];
190: i += nWndNodes;
191: xorNode = &nodes[i];
192: i += 1;
193: wnpNode = &nodes[i];
194: i += 1;
195: blockNode = &nodes[i];
196: i += 1;
197: commitNode = &nodes[i];
198: i += 1;
199: termNode = &nodes[i];
200: i += 1;
201: if (nfaults == 2) {
202: wnqNode = &nodes[i];
203: i += 1;
204: } else {
205: wnqNode = NULL;
206: }
207: rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h,
208: new_asm_h, &nRodNodes, &sosBuffer, &eosBuffer, allocList);
209: if (nRodNodes > 0) {
210: RF_CallocAndAdd(rodNodes, nRodNodes, sizeof(RF_DagNode_t),
211: (RF_DagNode_t *), allocList);
212: } else {
213: rodNodes = NULL;
214: }
215:
216: /* Begin node initialization. */
217: if (nRodNodes > 0) {
218: rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
219: rf_NullNodeUndoFunc, NULL, nRodNodes, 0, 0, 0, dag_h,
220: "Nil", allocList);
221: } else {
222: rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
223: rf_NullNodeUndoFunc, NULL, 1, 0, 0, 0, dag_h, "Nil",
224: allocList);
225: }
226:
227: rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc,
228: rf_NullNodeUndoFunc, NULL, nWndNodes + nfaults, 1, 0, 0,
229: dag_h, "Cmt", allocList);
230: rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc,
231: rf_TerminateUndoFunc, NULL, 0, nWndNodes + nfaults, 0, 0,
232: dag_h, "Trm", allocList);
233:
234: /* Initialize the Rod nodes. */
235: for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) {
236: if (new_asm_h[asmNum]) {
237: pda = new_asm_h[asmNum]->stripeMap->physInfo;
238: while (pda) {
239: rf_InitNode(&rodNodes[nodeNum], rf_wait,
240: RF_FALSE, rf_DiskReadFunc,
241: rf_DiskReadUndoFunc, rf_GenericWakeupFunc,
242: 1, 1, 4, 0, dag_h, "Rod", allocList);
243: rodNodes[nodeNum].params[0].p = pda;
244: rodNodes[nodeNum].params[1].p = pda->bufPtr;
245: rodNodes[nodeNum].params[2].v = parityStripeID;
246: rodNodes[nodeNum].params[3].v =
247: RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
248: 0, 0, which_ru);
249: nodeNum++;
250: pda = pda->next;
251: }
252: }
253: }
254: RF_ASSERT(nodeNum == nRodNodes);
255:
256: /* Initialize the wnd nodes. */
257: pda = asmap->physInfo;
258: for (i = 0; i < nWndNodes; i++) {
259: rf_InitNode(&wndNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc,
260: rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0,
261: dag_h, "Wnd", allocList);
262: RF_ASSERT(pda != NULL);
263: wndNodes[i].params[0].p = pda;
264: wndNodes[i].params[1].p = pda->bufPtr;
265: wndNodes[i].params[2].v = parityStripeID;
266: wndNodes[i].params[3].v =
267: RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
268: pda = pda->next;
269: }
270:
271: /* Initialize the redundancy node. */
272: if (nRodNodes > 0) {
273: rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc,
274: rf_NullNodeUndoFunc, NULL, 1, nRodNodes,
275: 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h,
276: "Xr ", allocList);
277: } else {
278: rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc,
279: rf_NullNodeUndoFunc, NULL, 1, 1,
280: 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h,
281: "Xr ", allocList);
282: }
283: xorNode->flags |= RF_DAGNODE_FLAG_YIELD;
284: for (i = 0; i < nWndNodes; i++) {
285: xorNode->params[2 * i + 0] =
286: wndNodes[i].params[0]; /* pda */
287: xorNode->params[2 * i + 1] =
288: wndNodes[i].params[1]; /* buf ptr */
289: }
290: for (i = 0; i < nRodNodes; i++) {
291: xorNode->params[2 * (nWndNodes + i) + 0] =
292: rodNodes[i].params[0]; /* pda */
293: xorNode->params[2 * (nWndNodes + i) + 1] =
294: rodNodes[i].params[1]; /* buf ptr */
295: }
296: /* Xor node needs to get at RAID information. */
297: xorNode->params[2 * (nWndNodes + nRodNodes)].p = raidPtr;
298:
299: /*
300: * Look for an Rod node that reads a complete SU. If none, alloc
301: * a buffer to receive the parity info. Note that we can't use a
302: * new data buffer because it will not have gotten written when
303: * the xor occurs.
304: */
305: if (allowBufferRecycle) {
306: for (i = 0; i < nRodNodes; i++) {
307: if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)
308: ->numSector == raidPtr->Layout.sectorsPerStripeUnit)
309: break;
310: }
311: }
312: if ((!allowBufferRecycle) || (i == nRodNodes)) {
313: RF_CallocAndAdd(xorNode->results[0], 1,
314: rf_RaidAddressToByte(raidPtr,
315: raidPtr->Layout.sectorsPerStripeUnit),
316: (void *), allocList);
317: } else {
318: xorNode->results[0] = rodNodes[i].params[1].p;
319: }
320:
321: /* Initialize the Wnp node. */
322: rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc,
323: rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0,
324: dag_h, "Wnp", allocList);
325: wnpNode->params[0].p = asmap->parityInfo;
326: wnpNode->params[1].p = xorNode->results[0];
327: wnpNode->params[2].v = parityStripeID;
328: wnpNode->params[3].v =
329: RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
330: /* parityInfo must describe entire parity unit. */
331: RF_ASSERT(asmap->parityInfo->next == NULL);
332:
333: if (nfaults == 2) {
334: /*
335: * We never try to recycle a buffer for the Q calculation
336: * in addition to the parity. This would cause two buffers
337: * to get smashed during the P and Q calculation, guaranteeing
338: * one would be wrong.
339: */
340: RF_CallocAndAdd(xorNode->results[1], 1,
341: rf_RaidAddressToByte(raidPtr,
342: raidPtr->Layout.sectorsPerStripeUnit),
343: (void *), allocList);
344: rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc,
345: rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0,
346: dag_h, "Wnq", allocList);
347: wnqNode->params[0].p = asmap->qInfo;
348: wnqNode->params[1].p = xorNode->results[1];
349: wnqNode->params[2].v = parityStripeID;
350: wnqNode->params[3].v =
351: RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
352: /* parityInfo must describe entire parity unit. */
353: RF_ASSERT(asmap->parityInfo->next == NULL);
354: }
355: /*
356: * Connect nodes to form graph.
357: */
358:
359: /* Connect dag header to block node. */
360: RF_ASSERT(blockNode->numAntecedents == 0);
361: dag_h->succedents[0] = blockNode;
362:
363: if (nRodNodes > 0) {
364: /* Connect the block node to the Rod nodes. */
365: RF_ASSERT(blockNode->numSuccedents == nRodNodes);
366: RF_ASSERT(xorNode->numAntecedents == nRodNodes);
367: for (i = 0; i < nRodNodes; i++) {
368: RF_ASSERT(rodNodes[i].numAntecedents == 1);
369: blockNode->succedents[i] = &rodNodes[i];
370: rodNodes[i].antecedents[0] = blockNode;
371: rodNodes[i].antType[0] = rf_control;
372:
373: /* Connect the Rod nodes to the Xor node. */
374: RF_ASSERT(rodNodes[i].numSuccedents == 1);
375: rodNodes[i].succedents[0] = xorNode;
376: xorNode->antecedents[i] = &rodNodes[i];
377: xorNode->antType[i] = rf_trueData;
378: }
379: } else {
380: /* Connect the block node to the Xor node. */
381: RF_ASSERT(blockNode->numSuccedents == 1);
382: RF_ASSERT(xorNode->numAntecedents == 1);
383: blockNode->succedents[0] = xorNode;
384: xorNode->antecedents[0] = blockNode;
385: xorNode->antType[0] = rf_control;
386: }
387:
388: /* Connect the xor node to the commit node. */
389: RF_ASSERT(xorNode->numSuccedents == 1);
390: RF_ASSERT(commitNode->numAntecedents == 1);
391: xorNode->succedents[0] = commitNode;
392: commitNode->antecedents[0] = xorNode;
393: commitNode->antType[0] = rf_control;
394:
395: /* Connect the commit node to the write nodes. */
396: RF_ASSERT(commitNode->numSuccedents == nWndNodes + nfaults);
397: for (i = 0; i < nWndNodes; i++) {
398: RF_ASSERT(wndNodes->numAntecedents == 1);
399: commitNode->succedents[i] = &wndNodes[i];
400: wndNodes[i].antecedents[0] = commitNode;
401: wndNodes[i].antType[0] = rf_control;
402: }
403: RF_ASSERT(wnpNode->numAntecedents == 1);
404: commitNode->succedents[nWndNodes] = wnpNode;
405: wnpNode->antecedents[0] = commitNode;
406: wnpNode->antType[0] = rf_trueData;
407: if (nfaults == 2) {
408: RF_ASSERT(wnqNode->numAntecedents == 1);
409: commitNode->succedents[nWndNodes + 1] = wnqNode;
410: wnqNode->antecedents[0] = commitNode;
411: wnqNode->antType[0] = rf_trueData;
412: }
413: /* Connect the write nodes to the term node. */
414: RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults);
415: RF_ASSERT(termNode->numSuccedents == 0);
416: for (i = 0; i < nWndNodes; i++) {
417: RF_ASSERT(wndNodes->numSuccedents == 1);
418: wndNodes[i].succedents[0] = termNode;
419: termNode->antecedents[i] = &wndNodes[i];
420: termNode->antType[i] = rf_control;
421: }
422: RF_ASSERT(wnpNode->numSuccedents == 1);
423: wnpNode->succedents[0] = termNode;
424: termNode->antecedents[nWndNodes] = wnpNode;
425: termNode->antType[nWndNodes] = rf_control;
426: if (nfaults == 2) {
427: RF_ASSERT(wnqNode->numSuccedents == 1);
428: wnqNode->succedents[0] = termNode;
429: termNode->antecedents[nWndNodes + 1] = wnqNode;
430: termNode->antType[nWndNodes + 1] = rf_control;
431: }
432: }
433: /*****************************************************************************
434: *
435: * Create a DAG to perform a small-write operation (either raid 5 or pq),
436: * which is as follows:
437: *
438: * Hdr -> Nil -> Rop -> Xor -> Cmt ----> Wnp [Unp] --> Trm
439: * \- Rod X / \----> Wnd [Und]-/
440: * [\- Rod X / \---> Wnd [Und]-/]
441: * [\- Roq -> Q / \--> Wnq [Unq]-/]
442: *
443: * Rop = read old parity
444: * Rod = read old data
445: * Roq = read old "q"
446: * Cmt = commit node
447: * Und = unlock data disk
448: * Unp = unlock parity disk
449: * Unq = unlock q disk
450: * Wnp = write new parity
451: * Wnd = write new data
452: * Wnq = write new "q"
453: * [ ] denotes optional segments in the graph.
454: *
455: * Parameters: raidPtr - description of the physical array
456: * asmap - logical & physical addresses for this access
457: * bp - buffer ptr (holds write data)
458: * flags - general flags (e.g. disk locking)
459: * allocList - list of memory allocated in DAG creation
460: * pfuncs - list of parity generating functions
461: * qfuncs - list of q generating functions
462: *
463: * A null qfuncs indicates single fault tolerant.
464: *****************************************************************************/
465:
466: void
467: rf_CommonCreateSmallWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
468: RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
469: RF_AllocListElem_t *allocList, RF_RedFuncs_t *pfuncs, RF_RedFuncs_t *qfuncs)
470: {
471: RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode;
472: RF_DagNode_t *unlockDataNodes, *unlockParityNodes, *unlockQNodes;
473: RF_DagNode_t *xorNodes, *qNodes, *blockNode, *commitNode, *nodes;
474: RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes;
475: int i, j, nNodes, totalNumNodes, lu_flag;
476: RF_ReconUnitNum_t which_ru;
477: int (*func) (RF_DagNode_t *);
478: int (*undoFunc) (RF_DagNode_t *);
479: int (*qfunc) (RF_DagNode_t *);
480: int numDataNodes, numParityNodes;
481: RF_StripeNum_t parityStripeID;
482: RF_PhysDiskAddr_t *pda;
483: char *name, *qname;
484: long nfaults;
485:
486: nfaults = qfuncs ? 2 : 1;
487: lu_flag = (rf_enableAtomicRMW) ? 1 : 0; /* Lock/unlock flag. */
488:
489: parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
490: asmap->raidAddress, &which_ru);
491: pda = asmap->physInfo;
492: numDataNodes = asmap->numStripeUnitsAccessed;
493: numParityNodes = (asmap->parityInfo->next) ? 2 : 1;
494:
495: if (rf_dagDebug) {
496: printf("[Creating small-write DAG]\n");
497: }
498: RF_ASSERT(numDataNodes > 0);
499: dag_h->creator = "SmallWriteDAG";
500:
501: dag_h->numCommitNodes = 1;
502: dag_h->numCommits = 0;
503: dag_h->numSuccedents = 1;
504:
505: /*
506: * DAG creation occurs in four steps:
507: * 1. Count the number of nodes in the DAG.
508: * 2. Create the nodes.
509: * 3. Initialize the nodes.
510: * 4. Connect the nodes.
511: */
512:
513: /*
514: * Step 1. Compute number of nodes in the graph.
515: */
516:
517: /*
518: * Number of nodes: a read and write for each data unit, a redundancy
519: * computation node for each parity node (nfaults * nparity), a read
520: * and write for each parity unit, a block and commit node (2), a
521: * terminate node if atomic RMW, an unlock node for each
522: * data/redundancy unit.
523: */
524: totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes)
525: + (nfaults * 2 * numParityNodes) + 3;
526: if (lu_flag) {
527: totalNumNodes += (numDataNodes + (nfaults * numParityNodes));
528: }
529: /*
530: * Step 2. Create the nodes.
531: */
532: RF_CallocAndAdd(nodes, totalNumNodes, sizeof(RF_DagNode_t),
533: (RF_DagNode_t *), allocList);
534: i = 0;
535: blockNode = &nodes[i];
536: i += 1;
537: commitNode = &nodes[i];
538: i += 1;
539: readDataNodes = &nodes[i];
540: i += numDataNodes;
541: readParityNodes = &nodes[i];
542: i += numParityNodes;
543: writeDataNodes = &nodes[i];
544: i += numDataNodes;
545: writeParityNodes = &nodes[i];
546: i += numParityNodes;
547: xorNodes = &nodes[i];
548: i += numParityNodes;
549: termNode = &nodes[i];
550: i += 1;
551: if (lu_flag) {
552: unlockDataNodes = &nodes[i];
553: i += numDataNodes;
554: unlockParityNodes = &nodes[i];
555: i += numParityNodes;
556: } else {
557: unlockDataNodes = unlockParityNodes = NULL;
558: }
559: if (nfaults == 2) {
560: readQNodes = &nodes[i];
561: i += numParityNodes;
562: writeQNodes = &nodes[i];
563: i += numParityNodes;
564: qNodes = &nodes[i];
565: i += numParityNodes;
566: if (lu_flag) {
567: unlockQNodes = &nodes[i];
568: i += numParityNodes;
569: } else {
570: unlockQNodes = NULL;
571: }
572: } else {
573: readQNodes = writeQNodes = qNodes = unlockQNodes = NULL;
574: }
575: RF_ASSERT(i == totalNumNodes);
576:
577: /*
578: * Step 3. Initialize the nodes.
579: */
580: /* Initialize block node (Nil). */
581: nNodes = numDataNodes + (nfaults * numParityNodes);
582: rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
583: rf_NullNodeUndoFunc, NULL, nNodes, 0, 0, 0, dag_h,
584: "Nil", allocList);
585:
586: /* Initialize commit node (Cmt). */
587: rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc,
588: rf_NullNodeUndoFunc, NULL, nNodes, (nfaults * numParityNodes),
589: 0, 0, dag_h, "Cmt", allocList);
590:
591: /* Initialize terminate node (Trm). */
592: rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc,
593: rf_TerminateUndoFunc, NULL, 0, nNodes, 0, 0, dag_h,
594: "Trm", allocList);
595:
596: /* Initialize nodes which read old data (Rod). */
597: for (i = 0; i < numDataNodes; i++) {
598: rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE,
599: rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc,
600: (nfaults * numParityNodes), 1, 4, 0, dag_h, "Rod",
601: allocList);
602: RF_ASSERT(pda != NULL);
603: /* Physical disk addr desc. */
604: readDataNodes[i].params[0].p = pda;
605: /* Buffer to hold old data. */
606: readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
607: dag_h, pda, allocList);
608: readDataNodes[i].params[2].v = parityStripeID;
609: readDataNodes[i].params[3].v =
610: RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
611: lu_flag, 0, which_ru);
612: pda = pda->next;
613: for (j = 0; j < readDataNodes[i].numSuccedents; j++) {
614: readDataNodes[i].propList[j] = NULL;
615: }
616: }
617:
618: /* Initialize nodes which read old parity (Rop). */
619: pda = asmap->parityInfo;
620: i = 0;
621: for (i = 0; i < numParityNodes; i++) {
622: RF_ASSERT(pda != NULL);
623: rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE,
624: rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc,
625: numParityNodes, 1, 4, 0, dag_h, "Rop", allocList);
626: readParityNodes[i].params[0].p = pda;
627: /* Buffer to hold old parity. */
628: readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
629: dag_h, pda, allocList);
630: readParityNodes[i].params[2].v = parityStripeID;
631: readParityNodes[i].params[3].v =
632: RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
633: lu_flag, 0, which_ru);
634: pda = pda->next;
635: for (j = 0; j < readParityNodes[i].numSuccedents; j++) {
636: readParityNodes[i].propList[0] = NULL;
637: }
638: }
639:
640: /* Initialize nodes which read old Q (Roq). */
641: if (nfaults == 2) {
642: pda = asmap->qInfo;
643: for (i = 0; i < numParityNodes; i++) {
644: RF_ASSERT(pda != NULL);
645: rf_InitNode(&readQNodes[i], rf_wait, RF_FALSE,
646: rf_DiskReadFunc, rf_DiskReadUndoFunc,
647: rf_GenericWakeupFunc, numParityNodes,
648: 1, 4, 0, dag_h, "Roq", allocList);
649: readQNodes[i].params[0].p = pda;
650: /* Buffer to hold old Q. */
651: readQNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
652: dag_h, pda, allocList);
653: readQNodes[i].params[2].v = parityStripeID;
654: readQNodes[i].params[3].v =
655: RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
656: lu_flag, 0, which_ru);
657: pda = pda->next;
658: for (j = 0; j < readQNodes[i].numSuccedents; j++) {
659: readQNodes[i].propList[0] = NULL;
660: }
661: }
662: }
663: /* Initialize nodes which write new data (Wnd). */
664: pda = asmap->physInfo;
665: for (i = 0; i < numDataNodes; i++) {
666: RF_ASSERT(pda != NULL);
667: rf_InitNode(&writeDataNodes[i], rf_wait, RF_FALSE,
668: rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
669: rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
670: "Wnd", allocList);
671: /* Physical disk addr desc. */
672: writeDataNodes[i].params[0].p = pda;
673: /* Buffer holding new data to be written. */
674: writeDataNodes[i].params[1].p = pda->bufPtr;
675: writeDataNodes[i].params[2].v = parityStripeID;
676: writeDataNodes[i].params[3].v =
677: RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
678: if (lu_flag) {
679: /* Initialize node to unlock the disk queue. */
680: rf_InitNode(&unlockDataNodes[i], rf_wait, RF_FALSE,
681: rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc,
682: rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
683: "Und", allocList);
684: /* Physical disk addr desc. */
685: unlockDataNodes[i].params[0].p = pda;
686: unlockDataNodes[i].params[1].v =
687: RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
688: 0, lu_flag, which_ru);
689: }
690: pda = pda->next;
691: }
692:
693: /*
694: * Initialize nodes which compute new parity and Q.
695: */
696: /*
697: * We use the simple XOR func in the double-XOR case, and when
698: * we're accessing only a portion of one stripe unit.
699: * The distinction between the two is that the regular XOR func
700: * assumes that the targbuf is a full SU in size, and examines
701: * the pda associated with the buffer to decide where within
702: * the buffer to XOR the data, whereas the simple XOR func just
703: * XORs the data into the start of the buffer.
704: */
705: if ((numParityNodes == 2) || ((numDataNodes == 1) &&
706: (asmap->totalSectorsAccessed <
707: raidPtr->Layout.sectorsPerStripeUnit))) {
708: func = pfuncs->simple;
709: undoFunc = rf_NullNodeUndoFunc;
710: name = pfuncs->SimpleName;
711: if (qfuncs) {
712: qfunc = qfuncs->simple;
713: qname = qfuncs->SimpleName;
714: } else {
715: qfunc = NULL;
716: qname = NULL;
717: }
718: } else {
719: func = pfuncs->regular;
720: undoFunc = rf_NullNodeUndoFunc;
721: name = pfuncs->RegularName;
722: if (qfuncs) {
723: qfunc = qfuncs->regular;
724: qname = qfuncs->RegularName;
725: } else {
726: qfunc = NULL;
727: qname = NULL;
728: }
729: }
730: /*
731: * Initialize the xor nodes: params are {pda,buf}.
732: * From {Rod,Wnd,Rop} nodes, and raidPtr.
733: */
734: if (numParityNodes == 2) {
735: /* Double-xor case. */
736: for (i = 0; i < numParityNodes; i++) {
737: /* Note: no wakeup func for xor. */
738: rf_InitNode(&xorNodes[i], rf_wait, RF_FALSE, func,
739: undoFunc, NULL, 1, (numDataNodes + numParityNodes),
740: 7, 1, dag_h, name, allocList);
741: xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD;
742: xorNodes[i].params[0] = readDataNodes[i].params[0];
743: xorNodes[i].params[1] = readDataNodes[i].params[1];
744: xorNodes[i].params[2] = readParityNodes[i].params[0];
745: xorNodes[i].params[3] = readParityNodes[i].params[1];
746: xorNodes[i].params[4] = writeDataNodes[i].params[0];
747: xorNodes[i].params[5] = writeDataNodes[i].params[1];
748: xorNodes[i].params[6].p = raidPtr;
749: /* Use old parity buf as target buf. */
750: xorNodes[i].results[0] = readParityNodes[i].params[1].p;
751: if (nfaults == 2) {
752: /* Note: no wakeup func for qor. */
753: rf_InitNode(&qNodes[i], rf_wait, RF_FALSE,
754: qfunc, undoFunc, NULL, 1,
755: (numDataNodes + numParityNodes), 7, 1,
756: dag_h, qname, allocList);
757: qNodes[i].params[0] =
758: readDataNodes[i].params[0];
759: qNodes[i].params[1] =
760: readDataNodes[i].params[1];
761: qNodes[i].params[2] = readQNodes[i].params[0];
762: qNodes[i].params[3] = readQNodes[i].params[1];
763: qNodes[i].params[4] =
764: writeDataNodes[i].params[0];
765: qNodes[i].params[5] =
766: writeDataNodes[i].params[1];
767: qNodes[i].params[6].p = raidPtr;
768: /* Use old Q buf as target buf. */
769: qNodes[i].results[0] =
770: readQNodes[i].params[1].p;
771: }
772: }
773: } else {
774: /* There is only one xor node in this case. */
775: rf_InitNode(&xorNodes[0], rf_wait, RF_FALSE, func, undoFunc,
776: NULL, 1, (numDataNodes + numParityNodes),
777: (2 * (numDataNodes + numDataNodes + 1) + 1), 1,
778: dag_h, name, allocList);
779: xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD;
780: for (i = 0; i < numDataNodes + 1; i++) {
781: /* Set up params related to Rod and Rop nodes. */
782: xorNodes[0].params[2 * i + 0] =
783: readDataNodes[i].params[0]; /* pda */
784: xorNodes[0].params[2 * i + 1] =
785: readDataNodes[i].params[1]; /* buffer ptr */
786: }
787: for (i = 0; i < numDataNodes; i++) {
788: /* Set up params related to Wnd and Wnp nodes. */
789: xorNodes[0].params[2 * (numDataNodes + 1 + i) + 0] =
790: writeDataNodes[i].params[0]; /* pda */
791: xorNodes[0].params[2 * (numDataNodes + 1 + i) + 1] =
792: writeDataNodes[i].params[1]; /* buffer ptr */
793: }
794: /* Xor node needs to get at RAID information. */
795: xorNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p =
796: raidPtr;
797: xorNodes[0].results[0] = readParityNodes[0].params[1].p;
798: if (nfaults == 2) {
799: rf_InitNode(&qNodes[0], rf_wait, RF_FALSE, qfunc,
800: undoFunc, NULL, 1, (numDataNodes + numParityNodes),
801: (2 * (numDataNodes + numDataNodes + 1) + 1), 1,
802: dag_h, qname, allocList);
803: for (i = 0; i < numDataNodes; i++) {
804: /* Set up params related to Rod. */
805: qNodes[0].params[2 * i + 0] =
806: readDataNodes[i].params[0]; /* pda */
807: qNodes[0].params[2 * i + 1] =
808: readDataNodes[i].params[1]; /* buffer ptr */
809: }
810: /* And read old q. */
811: qNodes[0].params[2 * numDataNodes + 0] =
812: readQNodes[0].params[0]; /* pda */
813: qNodes[0].params[2 * numDataNodes + 1] =
814: readQNodes[0].params[1]; /* buffer ptr */
815: for (i = 0; i < numDataNodes; i++) {
816: /* Set up params related to Wnd nodes. */
817: qNodes[0].params
818: [2 * (numDataNodes + 1 + i) + 0] =
819: /* pda */
820: writeDataNodes[i].params[0];
821: qNodes[0].params
822: [2 * (numDataNodes + 1 + i) + 1] =
823: /* buffer ptr */
824: writeDataNodes[i].params[1];
825: }
826: /* Xor node needs to get at RAID information. */
827: qNodes[0].params
828: [2 * (numDataNodes + numDataNodes + 1)].p = raidPtr;
829: qNodes[0].results[0] = readQNodes[0].params[1].p;
830: }
831: }
832:
833: /* Initialize nodes which write new parity (Wnp). */
834: pda = asmap->parityInfo;
835: for (i = 0; i < numParityNodes; i++) {
836: rf_InitNode(&writeParityNodes[i], rf_wait, RF_FALSE,
837: rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
838: rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
839: "Wnp", allocList);
840: RF_ASSERT(pda != NULL);
841: /* Param 1 (bufPtr) filled in by xor node. */
842: writeParityNodes[i].params[0].p = pda;
843: /* Buffer pointer for parity write operation. */
844: writeParityNodes[i].params[1].p = xorNodes[i].results[0];
845: writeParityNodes[i].params[2].v = parityStripeID;
846: writeParityNodes[i].params[3].v =
847: RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
848: if (lu_flag) {
849: /* Initialize node to unlock the disk queue. */
850: rf_InitNode(&unlockParityNodes[i], rf_wait, RF_FALSE,
851: rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc,
852: rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
853: "Unp", allocList);
854: /* Physical disk addr desc. */
855: unlockParityNodes[i].params[0].p = pda;
856: unlockParityNodes[i].params[1].v =
857: RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
858: 0, lu_flag, which_ru);
859: }
860: pda = pda->next;
861: }
862:
863: /* Initialize nodes which write new Q (Wnq). */
864: if (nfaults == 2) {
865: pda = asmap->qInfo;
866: for (i = 0; i < numParityNodes; i++) {
867: rf_InitNode(&writeQNodes[i], rf_wait, RF_FALSE,
868: rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
869: rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
870: "Wnq", allocList);
871: RF_ASSERT(pda != NULL);
872: /* Param 1 (bufPtr) filled in by xor node. */
873: writeQNodes[i].params[0].p = pda;
874: writeQNodes[i].params[1].p = qNodes[i].results[0];
875: /* Buffer pointer for parity write operation. */
876: writeQNodes[i].params[2].v = parityStripeID;
877: writeQNodes[i].params[3].v =
878: RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
879: 0, 0, which_ru);
880: if (lu_flag) {
881: /* Initialize node to unlock the disk queue. */
882: rf_InitNode(&unlockQNodes[i], rf_wait,
883: RF_FALSE, rf_DiskUnlockFunc,
884: rf_DiskUnlockUndoFunc,
885: rf_GenericWakeupFunc, 1, 1, 2, 0,
886: dag_h, "Unq", allocList);
887: /* Physical disk addr desc. */
888: unlockQNodes[i].params[0].p = pda;
889: unlockQNodes[i].params[1].v =
890: RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
891: 0, lu_flag, which_ru);
892: }
893: pda = pda->next;
894: }
895: }
896: /*
897: * Step 4. Connect the nodes.
898: */
899:
900: /* Connect header to block node. */
901: dag_h->succedents[0] = blockNode;
902:
903: /* Connect block node to read old data nodes. */
904: RF_ASSERT(blockNode->numSuccedents ==
905: (numDataNodes + (numParityNodes * nfaults)));
906: for (i = 0; i < numDataNodes; i++) {
907: blockNode->succedents[i] = &readDataNodes[i];
908: RF_ASSERT(readDataNodes[i].numAntecedents == 1);
909: readDataNodes[i].antecedents[0] = blockNode;
910: readDataNodes[i].antType[0] = rf_control;
911: }
912:
913: /* Connect block node to read old parity nodes. */
914: for (i = 0; i < numParityNodes; i++) {
915: blockNode->succedents[numDataNodes + i] = &readParityNodes[i];
916: RF_ASSERT(readParityNodes[i].numAntecedents == 1);
917: readParityNodes[i].antecedents[0] = blockNode;
918: readParityNodes[i].antType[0] = rf_control;
919: }
920:
921: /* Connect block node to read old Q nodes. */
922: if (nfaults == 2) {
923: for (i = 0; i < numParityNodes; i++) {
924: blockNode->succedents[numDataNodes + numParityNodes + i]
925: = &readQNodes[i];
926: RF_ASSERT(readQNodes[i].numAntecedents == 1);
927: readQNodes[i].antecedents[0] = blockNode;
928: readQNodes[i].antType[0] = rf_control;
929: }
930: }
931: /* Connect read old data nodes to xor nodes. */
932: for (i = 0; i < numDataNodes; i++) {
933: RF_ASSERT(readDataNodes[i].numSuccedents ==
934: (nfaults * numParityNodes));
935: for (j = 0; j < numParityNodes; j++) {
936: RF_ASSERT(xorNodes[j].numAntecedents ==
937: numDataNodes + numParityNodes);
938: readDataNodes[i].succedents[j] = &xorNodes[j];
939: xorNodes[j].antecedents[i] = &readDataNodes[i];
940: xorNodes[j].antType[i] = rf_trueData;
941: }
942: }
943:
944: /* Connect read old data nodes to q nodes. */
945: if (nfaults == 2) {
946: for (i = 0; i < numDataNodes; i++) {
947: for (j = 0; j < numParityNodes; j++) {
948: RF_ASSERT(qNodes[j].numAntecedents ==
949: numDataNodes + numParityNodes);
950: readDataNodes[i].succedents[numParityNodes + j]
951: = &qNodes[j];
952: qNodes[j].antecedents[i] = &readDataNodes[i];
953: qNodes[j].antType[i] = rf_trueData;
954: }
955: }
956: }
957: /* Connect read old parity nodes to xor nodes. */
958: for (i = 0; i < numParityNodes; i++) {
959: RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes);
960: for (j = 0; j < numParityNodes; j++) {
961: readParityNodes[i].succedents[j] = &xorNodes[j];
962: xorNodes[j].antecedents[numDataNodes + i] =
963: &readParityNodes[i];
964: xorNodes[j].antType[numDataNodes + i] = rf_trueData;
965: }
966: }
967:
968: /* Connect read old q nodes to q nodes. */
969: if (nfaults == 2) {
970: for (i = 0; i < numParityNodes; i++) {
971: RF_ASSERT(readParityNodes[i].numSuccedents ==
972: numParityNodes);
973: for (j = 0; j < numParityNodes; j++) {
974: readQNodes[i].succedents[j] = &qNodes[j];
975: qNodes[j].antecedents[numDataNodes + i] =
976: &readQNodes[i];
977: qNodes[j].antType[numDataNodes + i] =
978: rf_trueData;
979: }
980: }
981: }
982: /* Connect xor nodes to commit node. */
983: RF_ASSERT(commitNode->numAntecedents == (nfaults * numParityNodes));
984: for (i = 0; i < numParityNodes; i++) {
985: RF_ASSERT(xorNodes[i].numSuccedents == 1);
986: xorNodes[i].succedents[0] = commitNode;
987: commitNode->antecedents[i] = &xorNodes[i];
988: commitNode->antType[i] = rf_control;
989: }
990:
991: /* Connect q nodes to commit node. */
992: if (nfaults == 2) {
993: for (i = 0; i < numParityNodes; i++) {
994: RF_ASSERT(qNodes[i].numSuccedents == 1);
995: qNodes[i].succedents[0] = commitNode;
996: commitNode->antecedents[i + numParityNodes] =
997: &qNodes[i];
998: commitNode->antType[i + numParityNodes] = rf_control;
999: }
1000: }
1001: /* Connect commit node to write nodes. */
1002: RF_ASSERT(commitNode->numSuccedents ==
1003: (numDataNodes + (nfaults * numParityNodes)));
1004: for (i = 0; i < numDataNodes; i++) {
1005: RF_ASSERT(writeDataNodes[i].numAntecedents == 1);
1006: commitNode->succedents[i] = &writeDataNodes[i];
1007: writeDataNodes[i].antecedents[0] = commitNode;
1008: writeDataNodes[i].antType[0] = rf_trueData;
1009: }
1010: for (i = 0; i < numParityNodes; i++) {
1011: RF_ASSERT(writeParityNodes[i].numAntecedents == 1);
1012: commitNode->succedents[i + numDataNodes] = &writeParityNodes[i];
1013: writeParityNodes[i].antecedents[0] = commitNode;
1014: writeParityNodes[i].antType[0] = rf_trueData;
1015: }
1016: if (nfaults == 2) {
1017: for (i = 0; i < numParityNodes; i++) {
1018: RF_ASSERT(writeQNodes[i].numAntecedents == 1);
1019: commitNode->succedents
1020: [i + numDataNodes + numParityNodes] =
1021: &writeQNodes[i];
1022: writeQNodes[i].antecedents[0] = commitNode;
1023: writeQNodes[i].antType[0] = rf_trueData;
1024: }
1025: }
1026: RF_ASSERT(termNode->numAntecedents ==
1027: (numDataNodes + (nfaults * numParityNodes)));
1028: RF_ASSERT(termNode->numSuccedents == 0);
1029: for (i = 0; i < numDataNodes; i++) {
1030: if (lu_flag) {
1031: /* Connect write new data nodes to unlock nodes. */
1032: RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
1033: RF_ASSERT(unlockDataNodes[i].numAntecedents == 1);
1034: writeDataNodes[i].succedents[0] = &unlockDataNodes[i];
1035: unlockDataNodes[i].antecedents[0] = &writeDataNodes[i];
1036: unlockDataNodes[i].antType[0] = rf_control;
1037:
1038: /* Connect unlock nodes to term node. */
1039: RF_ASSERT(unlockDataNodes[i].numSuccedents == 1);
1040: unlockDataNodes[i].succedents[0] = termNode;
1041: termNode->antecedents[i] = &unlockDataNodes[i];
1042: termNode->antType[i] = rf_control;
1043: } else {
1044: /* Connect write new data nodes to term node. */
1045: RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
1046: RF_ASSERT(termNode->numAntecedents ==
1047: (numDataNodes + (nfaults * numParityNodes)));
1048: writeDataNodes[i].succedents[0] = termNode;
1049: termNode->antecedents[i] = &writeDataNodes[i];
1050: termNode->antType[i] = rf_control;
1051: }
1052: }
1053:
1054: for (i = 0; i < numParityNodes; i++) {
1055: if (lu_flag) {
1056: /* Connect write new parity nodes to unlock nodes. */
1057: RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
1058: RF_ASSERT(unlockParityNodes[i].numAntecedents == 1);
1059: writeParityNodes[i].succedents[0] =
1060: &unlockParityNodes[i];
1061: unlockParityNodes[i].antecedents[0] =
1062: &writeParityNodes[i];
1063: unlockParityNodes[i].antType[0] = rf_control;
1064:
1065: /* Connect unlock nodes to term node. */
1066: RF_ASSERT(unlockParityNodes[i].numSuccedents == 1);
1067: unlockParityNodes[i].succedents[0] = termNode;
1068: termNode->antecedents[numDataNodes + i] =
1069: &unlockParityNodes[i];
1070: termNode->antType[numDataNodes + i] = rf_control;
1071: } else {
1072: RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
1073: writeParityNodes[i].succedents[0] = termNode;
1074: termNode->antecedents[numDataNodes + i] =
1075: &writeParityNodes[i];
1076: termNode->antType[numDataNodes + i] = rf_control;
1077: }
1078: }
1079:
1080: if (nfaults == 2) {
1081: for (i = 0; i < numParityNodes; i++) {
1082: if (lu_flag) {
1083: /* Connect write new Q nodes to unlock nodes. */
1084: RF_ASSERT(writeQNodes[i].numSuccedents == 1);
1085: RF_ASSERT(unlockQNodes[i].numAntecedents == 1);
1086: writeQNodes[i].succedents[0] = &unlockQNodes[i];
1087: unlockQNodes[i].antecedents[0] =
1088: &writeQNodes[i];
1089: unlockQNodes[i].antType[0] = rf_control;
1090:
1091: /* Connect unlock nodes to unblock node. */
1092: RF_ASSERT(unlockQNodes[i].numSuccedents == 1);
1093: unlockQNodes[i].succedents[0] = termNode;
1094: termNode->antecedents
1095: [numDataNodes + numParityNodes + i] =
1096: &unlockQNodes[i];
1097: termNode->antType
1098: [numDataNodes + numParityNodes + i] =
1099: rf_control;
1100: } else {
1101: RF_ASSERT(writeQNodes[i].numSuccedents == 1);
1102: writeQNodes[i].succedents[0] = termNode;
1103: termNode->antecedents
1104: [numDataNodes + numParityNodes + i] =
1105: &writeQNodes[i];
1106: termNode->antType
1107: [numDataNodes + numParityNodes + i] =
1108: rf_control;
1109: }
1110: }
1111: }
1112: }
1113:
1114:
1115: /*****************************************************************************
1116: * Create a write graph (fault-free or degraded) for RAID level 1.
1117: *
1118: * Hdr -> Commit -> Wpd -> Nil -> Trm
1119: * -> Wsd ->
1120: *
1121: * The "Wpd" node writes data to the primary copy in the mirror pair.
1122: * The "Wsd" node writes data to the secondary copy in the mirror pair.
1123: *
1124: * Parameters: raidPtr - description of the physical array
1125: * asmap - logical & physical addresses for this access
1126: * bp - buffer ptr (holds write data)
1127: * flags - general flags (e.g. disk locking)
1128: * allocList - list of memory allocated in DAG creation
1129: *****************************************************************************/
1130:
1131: void
1132: rf_CreateRaidOneWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
1133: RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
1134: RF_AllocListElem_t *allocList)
1135: {
1136: RF_DagNode_t *unblockNode, *termNode, *commitNode;
1137: RF_DagNode_t *nodes, *wndNode, *wmirNode;
1138: int nWndNodes, nWmirNodes, i;
1139: RF_ReconUnitNum_t which_ru;
1140: RF_PhysDiskAddr_t *pda, *pdaP;
1141: RF_StripeNum_t parityStripeID;
1142:
1143: parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
1144: asmap->raidAddress, &which_ru);
1145: if (rf_dagDebug) {
1146: printf("[Creating RAID level 1 write DAG]\n");
1147: }
1148: dag_h->creator = "RaidOneWriteDAG";
1149:
1150: /* 2 implies access not SU aligned. */
1151: nWmirNodes = (asmap->parityInfo->next) ? 2 : 1;
1152: nWndNodes = (asmap->physInfo->next) ? 2 : 1;
1153:
1154: /* Alloc the Wnd nodes and the Wmir node. */
1155: if (asmap->numDataFailed == 1)
1156: nWndNodes--;
1157: if (asmap->numParityFailed == 1)
1158: nWmirNodes--;
1159:
1160: /*
1161: * Total number of nodes = nWndNodes + nWmirNodes
1162: * + (commit + unblock + terminator)
1163: */
1164: RF_CallocAndAdd(nodes, nWndNodes + nWmirNodes + 3, sizeof(RF_DagNode_t),
1165: (RF_DagNode_t *), allocList);
1166: i = 0;
1167: wndNode = &nodes[i];
1168: i += nWndNodes;
1169: wmirNode = &nodes[i];
1170: i += nWmirNodes;
1171: commitNode = &nodes[i];
1172: i += 1;
1173: unblockNode = &nodes[i];
1174: i += 1;
1175: termNode = &nodes[i];
1176: i += 1;
1177: RF_ASSERT(i == (nWndNodes + nWmirNodes + 3));
1178:
1179: /* This dag can commit immediately. */
1180: dag_h->numCommitNodes = 1;
1181: dag_h->numCommits = 0;
1182: dag_h->numSuccedents = 1;
1183:
1184: /* Initialize the commit, unblock, and term nodes. */
1185: rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc,
1186: rf_NullNodeUndoFunc, NULL, (nWndNodes + nWmirNodes), 0, 0, 0,
1187: dag_h, "Cmt", allocList);
1188: rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
1189: rf_NullNodeUndoFunc, NULL, 1, (nWndNodes + nWmirNodes), 0, 0,
1190: dag_h, "Nil", allocList);
1191: rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc,
1192: rf_TerminateUndoFunc, NULL, 0, 1, 0, 0, dag_h, "Trm", allocList);
1193:
1194: /* Initialize the wnd nodes. */
1195: if (nWndNodes > 0) {
1196: pda = asmap->physInfo;
1197: for (i = 0; i < nWndNodes; i++) {
1198: rf_InitNode(&wndNode[i], rf_wait, RF_FALSE,
1199: rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1200: rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
1201: "Wpd", allocList);
1202: RF_ASSERT(pda != NULL);
1203: wndNode[i].params[0].p = pda;
1204: wndNode[i].params[1].p = pda->bufPtr;
1205: wndNode[i].params[2].v = parityStripeID;
1206: wndNode[i].params[3].v =
1207: RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
1208: 0, 0, which_ru);
1209: pda = pda->next;
1210: }
1211: RF_ASSERT(pda == NULL);
1212: }
1213: /* Initialize the mirror nodes. */
1214: if (nWmirNodes > 0) {
1215: pda = asmap->physInfo;
1216: pdaP = asmap->parityInfo;
1217: for (i = 0; i < nWmirNodes; i++) {
1218: rf_InitNode(&wmirNode[i], rf_wait, RF_FALSE,
1219: rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1220: rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
1221: "Wsd", allocList);
1222: RF_ASSERT(pda != NULL);
1223: wmirNode[i].params[0].p = pdaP;
1224: wmirNode[i].params[1].p = pda->bufPtr;
1225: wmirNode[i].params[2].v = parityStripeID;
1226: wmirNode[i].params[3].v =
1227: RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
1228: 0, 0, which_ru);
1229: pda = pda->next;
1230: pdaP = pdaP->next;
1231: }
1232: RF_ASSERT(pda == NULL);
1233: RF_ASSERT(pdaP == NULL);
1234: }
1235: /* Link the header node to the commit node. */
1236: RF_ASSERT(dag_h->numSuccedents == 1);
1237: RF_ASSERT(commitNode->numAntecedents == 0);
1238: dag_h->succedents[0] = commitNode;
1239:
1240: /* Link the commit node to the write nodes. */
1241: RF_ASSERT(commitNode->numSuccedents == (nWndNodes + nWmirNodes));
1242: for (i = 0; i < nWndNodes; i++) {
1243: RF_ASSERT(wndNode[i].numAntecedents == 1);
1244: commitNode->succedents[i] = &wndNode[i];
1245: wndNode[i].antecedents[0] = commitNode;
1246: wndNode[i].antType[0] = rf_control;
1247: }
1248: for (i = 0; i < nWmirNodes; i++) {
1249: RF_ASSERT(wmirNode[i].numAntecedents == 1);
1250: commitNode->succedents[i + nWndNodes] = &wmirNode[i];
1251: wmirNode[i].antecedents[0] = commitNode;
1252: wmirNode[i].antType[0] = rf_control;
1253: }
1254:
1255: /* Link the write nodes to the unblock node. */
1256: RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes));
1257: for (i = 0; i < nWndNodes; i++) {
1258: RF_ASSERT(wndNode[i].numSuccedents == 1);
1259: wndNode[i].succedents[0] = unblockNode;
1260: unblockNode->antecedents[i] = &wndNode[i];
1261: unblockNode->antType[i] = rf_control;
1262: }
1263: for (i = 0; i < nWmirNodes; i++) {
1264: RF_ASSERT(wmirNode[i].numSuccedents == 1);
1265: wmirNode[i].succedents[0] = unblockNode;
1266: unblockNode->antecedents[i + nWndNodes] = &wmirNode[i];
1267: unblockNode->antType[i + nWndNodes] = rf_control;
1268: }
1269:
1270: /* Link the unblock node to the term node. */
1271: RF_ASSERT(unblockNode->numSuccedents == 1);
1272: RF_ASSERT(termNode->numAntecedents == 1);
1273: RF_ASSERT(termNode->numSuccedents == 0);
1274: unblockNode->succedents[0] = termNode;
1275: termNode->antecedents[0] = unblockNode;
1276: termNode->antType[0] = rf_control;
1277: }
1278:
1279:
1280:
1281: /*
1282: * DAGs that have no commit points.
1283: *
1284: * The following DAGs are used in forward and backward error recovery
1285: * experiments.
1286: * They are identical to the DAGs above this comment with the exception that
1287: * the commit points have been removed.
1288: */
1289:
1290:
1291: void
1292: rf_CommonCreateLargeWriteDAGFwd(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
1293: RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
1294: RF_AllocListElem_t *allocList, int nfaults, int (*redFunc) (RF_DagNode_t *),
1295: int allowBufferRecycle)
1296: {
1297: RF_DagNode_t *nodes, *wndNodes, *rodNodes, *xorNode, *wnpNode;
1298: RF_DagNode_t *wnqNode, *blockNode, *syncNode, *termNode;
1299: int nWndNodes, nRodNodes, i, nodeNum, asmNum;
1300: RF_AccessStripeMapHeader_t *new_asm_h[2];
1301: RF_StripeNum_t parityStripeID;
1302: char *sosBuffer, *eosBuffer;
1303: RF_ReconUnitNum_t which_ru;
1304: RF_RaidLayout_t *layoutPtr;
1305: RF_PhysDiskAddr_t *pda;
1306:
1307: layoutPtr = &(raidPtr->Layout);
1308: parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
1309: asmap->raidAddress, &which_ru);
1310:
1311: if (rf_dagDebug)
1312: printf("[Creating large-write DAG]\n");
1313: dag_h->creator = "LargeWriteDAGFwd";
1314:
1315: dag_h->numCommitNodes = 0;
1316: dag_h->numCommits = 0;
1317: dag_h->numSuccedents = 1;
1318:
1319: /* Alloc the nodes: Wnd, xor, commit, block, term, and Wnp. */
1320: nWndNodes = asmap->numStripeUnitsAccessed;
1321: RF_CallocAndAdd(nodes, nWndNodes + 4 + nfaults, sizeof(RF_DagNode_t),
1322: (RF_DagNode_t *), allocList);
1323: i = 0;
1324: wndNodes = &nodes[i];
1325: i += nWndNodes;
1326: xorNode = &nodes[i];
1327: i += 1;
1328: wnpNode = &nodes[i];
1329: i += 1;
1330: blockNode = &nodes[i];
1331: i += 1;
1332: syncNode = &nodes[i];
1333: i += 1;
1334: termNode = &nodes[i];
1335: i += 1;
1336: if (nfaults == 2) {
1337: wnqNode = &nodes[i];
1338: i += 1;
1339: } else {
1340: wnqNode = NULL;
1341: }
1342: rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h,
1343: new_asm_h, &nRodNodes, &sosBuffer, &eosBuffer, allocList);
1344: if (nRodNodes > 0) {
1345: RF_CallocAndAdd(rodNodes, nRodNodes, sizeof(RF_DagNode_t),
1346: (RF_DagNode_t *), allocList);
1347: } else {
1348: rodNodes = NULL;
1349: }
1350:
1351: /* Begin node initialization. */
1352: if (nRodNodes > 0) {
1353: rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
1354: rf_NullNodeUndoFunc, NULL, nRodNodes, 0, 0, 0, dag_h,
1355: "Nil", allocList);
1356: rf_InitNode(syncNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
1357: rf_NullNodeUndoFunc, NULL, nWndNodes + 1, nRodNodes, 0, 0,
1358: dag_h, "Nil", allocList);
1359: } else {
1360: rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
1361: rf_NullNodeUndoFunc, NULL, 1, 0, 0, 0, dag_h, "Nil",
1362: allocList);
1363: rf_InitNode(syncNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
1364: rf_NullNodeUndoFunc, NULL, nWndNodes + 1, 1, 0, 0, dag_h,
1365: "Nil", allocList);
1366: }
1367:
1368: rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc,
1369: rf_TerminateUndoFunc, NULL, 0, nWndNodes + nfaults, 0, 0,
1370: dag_h, "Trm", allocList);
1371:
1372: /* Initialize the Rod nodes. */
1373: for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) {
1374: if (new_asm_h[asmNum]) {
1375: pda = new_asm_h[asmNum]->stripeMap->physInfo;
1376: while (pda) {
1377: rf_InitNode(&rodNodes[nodeNum], rf_wait,
1378: RF_FALSE, rf_DiskReadFunc,
1379: rf_DiskReadUndoFunc, rf_GenericWakeupFunc,
1380: 1, 1, 4, 0, dag_h, "Rod", allocList);
1381: rodNodes[nodeNum].params[0].p = pda;
1382: rodNodes[nodeNum].params[1].p = pda->bufPtr;
1383: rodNodes[nodeNum].params[2].v = parityStripeID;
1384: rodNodes[nodeNum].params[3].v =
1385: RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
1386: 0, 0, which_ru);
1387: nodeNum++;
1388: pda = pda->next;
1389: }
1390: }
1391: }
1392: RF_ASSERT(nodeNum == nRodNodes);
1393:
1394: /* Initialize the wnd nodes. */
1395: pda = asmap->physInfo;
1396: for (i = 0; i < nWndNodes; i++) {
1397: rf_InitNode(&wndNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc,
1398: rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0,
1399: dag_h, "Wnd", allocList);
1400: RF_ASSERT(pda != NULL);
1401: wndNodes[i].params[0].p = pda;
1402: wndNodes[i].params[1].p = pda->bufPtr;
1403: wndNodes[i].params[2].v = parityStripeID;
1404: wndNodes[i].params[3].v =
1405: RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1406: pda = pda->next;
1407: }
1408:
1409: /* Initialize the redundancy node. */
1410: rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc,
1411: NULL, 1, nfaults, 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h,
1412: "Xr ", allocList);
1413: xorNode->flags |= RF_DAGNODE_FLAG_YIELD;
1414: for (i = 0; i < nWndNodes; i++) {
1415: xorNode->params[2 * i + 0] =
1416: wndNodes[i].params[0]; /* pda */
1417: xorNode->params[2 * i + 1] =
1418: wndNodes[i].params[1]; /* buf ptr */
1419: }
1420: for (i = 0; i < nRodNodes; i++) {
1421: xorNode->params[2 * (nWndNodes + i) + 0] =
1422: rodNodes[i].params[0]; /* pda */
1423: xorNode->params[2 * (nWndNodes + i) + 1] =
1424: rodNodes[i].params[1]; /* buf ptr */
1425: }
1426: /* Xor node needs to get at RAID information. */
1427: xorNode->params[2 * (nWndNodes + nRodNodes)].p = raidPtr;
1428:
1429: /*
1430: * Look for an Rod node that reads a complete SU. If none, alloc a
1431: * buffer to receive the parity info. Note that we can't use a new
1432: * data buffer because it will not have gotten written when the xor
1433: * occurs.
1434: */
1435: if (allowBufferRecycle) {
1436: for (i = 0; i < nRodNodes; i++)
1437: if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)
1438: ->numSector == raidPtr->Layout.sectorsPerStripeUnit)
1439: break;
1440: }
1441: if ((!allowBufferRecycle) || (i == nRodNodes)) {
1442: RF_CallocAndAdd(xorNode->results[0], 1,
1443: rf_RaidAddressToByte(raidPtr,
1444: raidPtr->Layout.sectorsPerStripeUnit),
1445: (void *), allocList);
1446: } else
1447: xorNode->results[0] = rodNodes[i].params[1].p;
1448:
1449: /* Initialize the Wnp node. */
1450: rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc,
1451: rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0,
1452: dag_h, "Wnp", allocList);
1453: wnpNode->params[0].p = asmap->parityInfo;
1454: wnpNode->params[1].p = xorNode->results[0];
1455: wnpNode->params[2].v = parityStripeID;
1456: wnpNode->params[3].v =
1457: RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1458: /* parityInfo must describe entire parity unit. */
1459: RF_ASSERT(asmap->parityInfo->next == NULL);
1460:
1461: if (nfaults == 2) {
1462: /*
1463: * Never try to recycle a buffer for the Q calcuation in
1464: * addition to the parity. This would cause two buffers to
1465: * get smashed during the P and Q calculation, guaranteeing
1466: * one would be wrong.
1467: */
1468: RF_CallocAndAdd(xorNode->results[1], 1,
1469: rf_RaidAddressToByte(raidPtr,
1470: raidPtr->Layout.sectorsPerStripeUnit),
1471: (void *), allocList);
1472: rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc,
1473: rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0,
1474: dag_h, "Wnq", allocList);
1475: wnqNode->params[0].p = asmap->qInfo;
1476: wnqNode->params[1].p = xorNode->results[1];
1477: wnqNode->params[2].v = parityStripeID;
1478: wnqNode->params[3].v =
1479: RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1480: /* parityInfo must describe entire parity unit. */
1481: RF_ASSERT(asmap->parityInfo->next == NULL);
1482: }
1483:
1484: /* Connect nodes to form graph. */
1485:
1486: /* Connect dag header to block node. */
1487: RF_ASSERT(blockNode->numAntecedents == 0);
1488: dag_h->succedents[0] = blockNode;
1489:
1490: if (nRodNodes > 0) {
1491: /* Connect the block node to the Rod nodes. */
1492: RF_ASSERT(blockNode->numSuccedents == nRodNodes);
1493: RF_ASSERT(syncNode->numAntecedents == nRodNodes);
1494: for (i = 0; i < nRodNodes; i++) {
1495: RF_ASSERT(rodNodes[i].numAntecedents == 1);
1496: blockNode->succedents[i] = &rodNodes[i];
1497: rodNodes[i].antecedents[0] = blockNode;
1498: rodNodes[i].antType[0] = rf_control;
1499:
1500: /* Connect the Rod nodes to the Nil node. */
1501: RF_ASSERT(rodNodes[i].numSuccedents == 1);
1502: rodNodes[i].succedents[0] = syncNode;
1503: syncNode->antecedents[i] = &rodNodes[i];
1504: syncNode->antType[i] = rf_trueData;
1505: }
1506: } else {
1507: /* Connect the block node to the Nil node. */
1508: RF_ASSERT(blockNode->numSuccedents == 1);
1509: RF_ASSERT(syncNode->numAntecedents == 1);
1510: blockNode->succedents[0] = syncNode;
1511: syncNode->antecedents[0] = blockNode;
1512: syncNode->antType[0] = rf_control;
1513: }
1514:
1515: /* Connect the sync node to the Wnd nodes. */
1516: RF_ASSERT(syncNode->numSuccedents == (1 + nWndNodes));
1517: for (i = 0; i < nWndNodes; i++) {
1518: RF_ASSERT(wndNodes->numAntecedents == 1);
1519: syncNode->succedents[i] = &wndNodes[i];
1520: wndNodes[i].antecedents[0] = syncNode;
1521: wndNodes[i].antType[0] = rf_control;
1522: }
1523:
1524: /* Connect the sync node to the Xor node. */
1525: RF_ASSERT(xorNode->numAntecedents == 1);
1526: syncNode->succedents[nWndNodes] = xorNode;
1527: xorNode->antecedents[0] = syncNode;
1528: xorNode->antType[0] = rf_control;
1529:
1530: /* Connect the xor node to the write parity node. */
1531: RF_ASSERT(xorNode->numSuccedents == nfaults);
1532: RF_ASSERT(wnpNode->numAntecedents == 1);
1533: xorNode->succedents[0] = wnpNode;
1534: wnpNode->antecedents[0] = xorNode;
1535: wnpNode->antType[0] = rf_trueData;
1536: if (nfaults == 2) {
1537: RF_ASSERT(wnqNode->numAntecedents == 1);
1538: xorNode->succedents[1] = wnqNode;
1539: wnqNode->antecedents[0] = xorNode;
1540: wnqNode->antType[0] = rf_trueData;
1541: }
1542: /* Connect the write nodes to the term node. */
1543: RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults);
1544: RF_ASSERT(termNode->numSuccedents == 0);
1545: for (i = 0; i < nWndNodes; i++) {
1546: RF_ASSERT(wndNodes->numSuccedents == 1);
1547: wndNodes[i].succedents[0] = termNode;
1548: termNode->antecedents[i] = &wndNodes[i];
1549: termNode->antType[i] = rf_control;
1550: }
1551: RF_ASSERT(wnpNode->numSuccedents == 1);
1552: wnpNode->succedents[0] = termNode;
1553: termNode->antecedents[nWndNodes] = wnpNode;
1554: termNode->antType[nWndNodes] = rf_control;
1555: if (nfaults == 2) {
1556: RF_ASSERT(wnqNode->numSuccedents == 1);
1557: wnqNode->succedents[0] = termNode;
1558: termNode->antecedents[nWndNodes + 1] = wnqNode;
1559: termNode->antType[nWndNodes + 1] = rf_control;
1560: }
1561: }
1562:
1563:
1564: /*****************************************************************************
1565: *
1566: * Create a DAG to perform a small-write operation (either raid 5 or pq),
1567: * which is as follows:
1568: *
1569: * Hdr -> Nil -> Rop - Xor - Wnp [Unp] -- Trm
1570: * \- Rod X- Wnd [Und] -------/
1571: * [\- Rod X- Wnd [Und] ------/]
1572: * [\- Roq - Q --> Wnq [Unq]-/]
1573: *
1574: * Rop = read old parity
1575: * Rod = read old data
1576: * Roq = read old "q"
1577: * Cmt = commit node
1578: * Und = unlock data disk
1579: * Unp = unlock parity disk
1580: * Unq = unlock q disk
1581: * Wnp = write new parity
1582: * Wnd = write new data
1583: * Wnq = write new "q"
1584: * [ ] denotes optional segments in the graph.
1585: *
1586: * Parameters: raidPtr - description of the physical array
1587: * asmap - logical & physical addresses for this access
1588: * bp - buffer ptr (holds write data)
1589: * flags - general flags (e.g. disk locking)
1590: * allocList - list of memory allocated in DAG creation
1591: * pfuncs - list of parity generating functions
1592: * qfuncs - list of q generating functions
1593: *
1594: * A null qfuncs indicates single fault tolerant.
1595: *****************************************************************************/
1596:
1597: void
1598: rf_CommonCreateSmallWriteDAGFwd(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
1599: RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
1600: RF_AllocListElem_t *allocList, RF_RedFuncs_t *pfuncs, RF_RedFuncs_t *qfuncs)
1601: {
1602: RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode;
1603: RF_DagNode_t *unlockDataNodes, *unlockParityNodes, *unlockQNodes;
1604: RF_DagNode_t *xorNodes, *qNodes, *blockNode, *nodes;
1605: RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes;
1606: int i, j, nNodes, totalNumNodes, lu_flag;
1607: RF_ReconUnitNum_t which_ru;
1608: int (*func) (RF_DagNode_t *);
1609: int (*undoFunc) (RF_DagNode_t *);
1610: int (*qfunc) (RF_DagNode_t *);
1611: int numDataNodes, numParityNodes;
1612: RF_StripeNum_t parityStripeID;
1613: RF_PhysDiskAddr_t *pda;
1614: char *name, *qname;
1615: long nfaults;
1616:
1617: nfaults = qfuncs ? 2 : 1;
1618: lu_flag = (rf_enableAtomicRMW) ? 1 : 0; /* Lock/unlock flag. */
1619:
1620: parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
1621: asmap->raidAddress, &which_ru);
1622: pda = asmap->physInfo;
1623: numDataNodes = asmap->numStripeUnitsAccessed;
1624: numParityNodes = (asmap->parityInfo->next) ? 2 : 1;
1625:
1626: if (rf_dagDebug)
1627: printf("[Creating small-write DAG]\n");
1628: RF_ASSERT(numDataNodes > 0);
1629: dag_h->creator = "SmallWriteDAGFwd";
1630:
1631: dag_h->numCommitNodes = 0;
1632: dag_h->numCommits = 0;
1633: dag_h->numSuccedents = 1;
1634:
1635: qfunc = NULL;
1636: qname = NULL;
1637:
1638: /*
1639: * DAG creation occurs in four steps:
1640: * 1. Count the number of nodes in the DAG.
1641: * 2. Create the nodes.
1642: * 3. Initialize the nodes.
1643: * 4. Connect the nodes.
1644: */
1645:
1646: /* Step 1. Compute number of nodes in the graph. */
1647:
1648: /*
1649: * Number of nodes: a read and write for each data unit, a redundancy
1650: * computation node for each parity node (nfaults * nparity), a read
1651: * and write for each parity unit, a block node, a terminate node if
1652: * atomic RMW, an unlock node for each data/redundancy unit.
1653: */
1654: totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes)
1655: + (nfaults * 2 * numParityNodes) + 2;
1656: if (lu_flag)
1657: totalNumNodes += (numDataNodes + (nfaults * numParityNodes));
1658:
1659:
1660: /* Step 2. Create the nodes. */
1661: RF_CallocAndAdd(nodes, totalNumNodes, sizeof(RF_DagNode_t),
1662: (RF_DagNode_t *), allocList);
1663: i = 0;
1664: blockNode = &nodes[i];
1665: i += 1;
1666: readDataNodes = &nodes[i];
1667: i += numDataNodes;
1668: readParityNodes = &nodes[i];
1669: i += numParityNodes;
1670: writeDataNodes = &nodes[i];
1671: i += numDataNodes;
1672: writeParityNodes = &nodes[i];
1673: i += numParityNodes;
1674: xorNodes = &nodes[i];
1675: i += numParityNodes;
1676: termNode = &nodes[i];
1677: i += 1;
1678: if (lu_flag) {
1679: unlockDataNodes = &nodes[i];
1680: i += numDataNodes;
1681: unlockParityNodes = &nodes[i];
1682: i += numParityNodes;
1683: } else {
1684: unlockDataNodes = unlockParityNodes = NULL;
1685: }
1686: if (nfaults == 2) {
1687: readQNodes = &nodes[i];
1688: i += numParityNodes;
1689: writeQNodes = &nodes[i];
1690: i += numParityNodes;
1691: qNodes = &nodes[i];
1692: i += numParityNodes;
1693: if (lu_flag) {
1694: unlockQNodes = &nodes[i];
1695: i += numParityNodes;
1696: } else {
1697: unlockQNodes = NULL;
1698: }
1699: } else {
1700: readQNodes = writeQNodes = qNodes = unlockQNodes = NULL;
1701: }
1702: RF_ASSERT(i == totalNumNodes);
1703:
1704: /* Step 3. Initialize the nodes. */
1705: /* Initialize block node (Nil). */
1706: nNodes = numDataNodes + (nfaults * numParityNodes);
1707: rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
1708: rf_NullNodeUndoFunc, NULL, nNodes, 0, 0, 0, dag_h,
1709: "Nil", allocList);
1710:
1711: /* Initialize terminate node (Trm). */
1712: rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc,
1713: rf_TerminateUndoFunc, NULL, 0, nNodes, 0, 0, dag_h,
1714: "Trm", allocList);
1715:
1716: /* Initialize nodes which read old data (Rod). */
1717: for (i = 0; i < numDataNodes; i++) {
1718: rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE,
1719: rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc,
1720: (numParityNodes * nfaults) + 1, 1, 4, 0, dag_h,
1721: "Rod", allocList);
1722: RF_ASSERT(pda != NULL);
1723: /* Physical disk addr desc. */
1724: readDataNodes[i].params[0].p = pda;
1725: /* Buffer to hold old data. */
1726: readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h,
1727: pda, allocList);
1728: readDataNodes[i].params[2].v = parityStripeID;
1729: readDataNodes[i].params[3].v =
1730: RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
1731: lu_flag, 0, which_ru);
1732: pda = pda->next;
1733: for (j = 0; j < readDataNodes[i].numSuccedents; j++)
1734: readDataNodes[i].propList[j] = NULL;
1735: }
1736:
1737: /* Initialize nodes which read old parity (Rop). */
1738: pda = asmap->parityInfo;
1739: i = 0;
1740: for (i = 0; i < numParityNodes; i++) {
1741: RF_ASSERT(pda != NULL);
1742: rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE,
1743: rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc,
1744: numParityNodes, 1, 4, 0, dag_h, "Rop", allocList);
1745: readParityNodes[i].params[0].p = pda;
1746: /* Buffer to hold old parity. */
1747: readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
1748: dag_h, pda, allocList);
1749: readParityNodes[i].params[2].v = parityStripeID;
1750: readParityNodes[i].params[3].v =
1751: RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
1752: lu_flag, 0, which_ru);
1753: for (j = 0; j < readParityNodes[i].numSuccedents; j++)
1754: readParityNodes[i].propList[0] = NULL;
1755: pda = pda->next;
1756: }
1757:
1758: /* Initialize nodes which read old Q (Roq). */
1759: if (nfaults == 2) {
1760: pda = asmap->qInfo;
1761: for (i = 0; i < numParityNodes; i++) {
1762: RF_ASSERT(pda != NULL);
1763: rf_InitNode(&readQNodes[i], rf_wait, RF_FALSE,
1764: rf_DiskReadFunc, rf_DiskReadUndoFunc,
1765: rf_GenericWakeupFunc, numParityNodes, 1, 4, 0,
1766: dag_h, "Roq", allocList);
1767: readQNodes[i].params[0].p = pda;
1768: /* Buffer to hold old Q. */
1769: readQNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
1770: dag_h, pda, allocList);
1771: readQNodes[i].params[2].v = parityStripeID;
1772: readQNodes[i].params[3].v =
1773: RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
1774: lu_flag, 0, which_ru);
1775: for (j = 0; j < readQNodes[i].numSuccedents; j++)
1776: readQNodes[i].propList[0] = NULL;
1777: pda = pda->next;
1778: }
1779: }
1780: /* Initialize nodes which write new data (Wnd). */
1781: pda = asmap->physInfo;
1782: for (i = 0; i < numDataNodes; i++) {
1783: RF_ASSERT(pda != NULL);
1784: rf_InitNode(&writeDataNodes[i], rf_wait, RF_FALSE,
1785: rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1786: rf_GenericWakeupFunc, 1, 1, 4, 0,
1787: dag_h, "Wnd", allocList);
1788: /* Physical disk addr desc. */
1789: writeDataNodes[i].params[0].p = pda;
1790: /* Buffer holding new data to be written. */
1791: writeDataNodes[i].params[1].p = pda->bufPtr;
1792: writeDataNodes[i].params[2].v = parityStripeID;
1793: writeDataNodes[i].params[3].v =
1794: RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1795:
1796: if (lu_flag) {
1797: /* Initialize node to unlock the disk queue. */
1798: rf_InitNode(&unlockDataNodes[i], rf_wait, RF_FALSE,
1799: rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc,
1800: rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
1801: "Und", allocList);
1802: /* Physical disk addr desc. */
1803: unlockDataNodes[i].params[0].p = pda;
1804: unlockDataNodes[i].params[1].v =
1805: RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
1806: 0, lu_flag, which_ru);
1807: }
1808: pda = pda->next;
1809: }
1810:
1811:
1812: /* Initialize nodes which compute new parity and Q. */
1813: /*
1814: * Use the simple XOR func in the double-XOR case, and when
1815: * accessing only a portion of one stripe unit. The distinction
1816: * between the two is that the regular XOR func assumes that the
1817: * targbuf is a full SU in size, and examines the pda associated with
1818: * the buffer to decide where within the buffer to XOR the data,
1819: * whereas the simple XOR func just XORs the data into the start of
1820: * the buffer.
1821: */
1822: if ((numParityNodes == 2) || ((numDataNodes == 1) &&
1823: (asmap->totalSectorsAccessed <
1824: raidPtr->Layout.sectorsPerStripeUnit))) {
1825: func = pfuncs->simple;
1826: undoFunc = rf_NullNodeUndoFunc;
1827: name = pfuncs->SimpleName;
1828: if (qfuncs) {
1829: qfunc = qfuncs->simple;
1830: qname = qfuncs->SimpleName;
1831: }
1832: } else {
1833: func = pfuncs->regular;
1834: undoFunc = rf_NullNodeUndoFunc;
1835: name = pfuncs->RegularName;
1836: if (qfuncs) {
1837: qfunc = qfuncs->regular;
1838: qname = qfuncs->RegularName;
1839: }
1840: }
1841: /*
1842: * Initialize the xor nodes: params are {pda,buf} from {Rod,Wnd,Rop}
1843: * nodes, and raidPtr.
1844: */
1845: if (numParityNodes == 2) { /* Double-xor case. */
1846: for (i = 0; i < numParityNodes; i++) {
1847: /* No wakeup func for xor. */
1848: rf_InitNode(&xorNodes[i], rf_wait, RF_FALSE, func,
1849: undoFunc, NULL, numParityNodes, numParityNodes +
1850: numDataNodes, 7, 1, dag_h, name, allocList);
1851: xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD;
1852: xorNodes[i].params[0] = readDataNodes[i].params[0];
1853: xorNodes[i].params[1] = readDataNodes[i].params[1];
1854: xorNodes[i].params[2] = readParityNodes[i].params[0];
1855: xorNodes[i].params[3] = readParityNodes[i].params[1];
1856: xorNodes[i].params[4] = writeDataNodes[i].params[0];
1857: xorNodes[i].params[5] = writeDataNodes[i].params[1];
1858: xorNodes[i].params[6].p = raidPtr;
1859: /* Use old parity buf as target buf. */
1860: xorNodes[i].results[0] = readParityNodes[i].params[1].p;
1861: if (nfaults == 2) {
1862: /* No wakeup func for xor. */
1863: rf_InitNode(&qNodes[i], rf_wait, RF_FALSE,
1864: qfunc, undoFunc, NULL, numParityNodes,
1865: numParityNodes + numDataNodes, 7, 1,
1866: dag_h, qname, allocList);
1867: qNodes[i].params[0] =
1868: readDataNodes[i].params[0];
1869: qNodes[i].params[1] =
1870: readDataNodes[i].params[1];
1871: qNodes[i].params[2] = readQNodes[i].params[0];
1872: qNodes[i].params[3] = readQNodes[i].params[1];
1873: qNodes[i].params[4] =
1874: writeDataNodes[i].params[0];
1875: qNodes[i].params[5] =
1876: writeDataNodes[i].params[1];
1877: qNodes[i].params[6].p = raidPtr;
1878: /* Use old Q buf as target buf. */
1879: qNodes[i].results[0] =
1880: readQNodes[i].params[1].p;
1881: }
1882: }
1883: } else {
1884: /* There is only one xor node in this case. */
1885: rf_InitNode(&xorNodes[0], rf_wait, RF_FALSE, func, undoFunc,
1886: NULL, numParityNodes, numParityNodes + numDataNodes,
1887: (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h,
1888: name, allocList);
1889: xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD;
1890: for (i = 0; i < numDataNodes + 1; i++) {
1891: /* Set up params related to Rod and Rop nodes. */
1892: xorNodes[0].params[2 * i + 0] =
1893: readDataNodes[i].params[0]; /* pda */
1894: xorNodes[0].params[2 * i + 1] =
1895: readDataNodes[i].params[1]; /* buffer pointer */
1896: }
1897: for (i = 0; i < numDataNodes; i++) {
1898: /* Set up params related to Wnd and Wnp nodes. */
1899: xorNodes[0].params[2 * (numDataNodes + 1 + i) + 0] =
1900: writeDataNodes[i].params[0]; /* pda */
1901: xorNodes[0].params[2 * (numDataNodes + 1 + i) + 1] =
1902: writeDataNodes[i].params[1]; /* buffer pointer */
1903: }
1904: xorNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p =
1905: raidPtr; /* xor node needs to get at RAID information */
1906: xorNodes[0].results[0] = readParityNodes[0].params[1].p;
1907: if (nfaults == 2) {
1908: rf_InitNode(&qNodes[0], rf_wait, RF_FALSE, qfunc,
1909: undoFunc, NULL, numParityNodes,
1910: numParityNodes + numDataNodes,
1911: (2 * (numDataNodes + numDataNodes + 1) + 1),
1912: 1, dag_h, qname, allocList);
1913: for (i = 0; i < numDataNodes; i++) {
1914: /* Set up params related to Rod. */
1915: /* pda */
1916: qNodes[0].params[2 * i + 0] =
1917: readDataNodes[i].params[0];
1918: /* buffer pointer */
1919: qNodes[0].params[2 * i + 1] =
1920: readDataNodes[i].params[1];
1921: }
1922: /* And read old q. */
1923: qNodes[0].params[2 * numDataNodes + 0] =
1924: readQNodes[0].params[0]; /* pda */
1925: qNodes[0].params[2 * numDataNodes + 1] =
1926: readQNodes[0].params[1]; /* buffer pointer */
1927: for (i = 0; i < numDataNodes; i++) {
1928: /* Set up params related to Wnd nodes. */
1929: /* pda */
1930: qNodes[0].params
1931: [2 * (numDataNodes + 1 + i) + 0] =
1932: writeDataNodes[i].params[0];
1933: /* buffer pointer */
1934: qNodes[0].params
1935: [2 * (numDataNodes + 1 + i) + 1] =
1936: writeDataNodes[i].params[1];
1937: }
1938: /* Xor node needs to get at RAID information. */
1939: qNodes[0].params
1940: [2 * (numDataNodes + numDataNodes + 1)].p =
1941: raidPtr;
1942: qNodes[0].results[0] = readQNodes[0].params[1].p;
1943: }
1944: }
1945:
1946: /* Initialize nodes which write new parity (Wnp). */
1947: pda = asmap->parityInfo;
1948: for (i = 0; i < numParityNodes; i++) {
1949: rf_InitNode(&writeParityNodes[i], rf_wait, RF_FALSE,
1950: rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1951: rf_GenericWakeupFunc, 1, numParityNodes,
1952: 4, 0, dag_h, "Wnp", allocList);
1953: RF_ASSERT(pda != NULL);
1954: /* Param 1 (bufPtr) filled in by xor node. */
1955: writeParityNodes[i].params[0].p = pda;
1956: /* Buffer pointer for parity write operation. */
1957: writeParityNodes[i].params[1].p = xorNodes[i].results[0];
1958: writeParityNodes[i].params[2].v = parityStripeID;
1959: writeParityNodes[i].params[3].v =
1960: RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1961:
1962: if (lu_flag) {
1963: /* Initialize node to unlock the disk queue. */
1964: rf_InitNode(&unlockParityNodes[i], rf_wait, RF_FALSE,
1965: rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc,
1966: rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
1967: "Unp", allocList);
1968: unlockParityNodes[i].params[0].p =
1969: pda; /* Physical disk addr desc. */
1970: unlockParityNodes[i].params[1].v =
1971: RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
1972: 0, lu_flag, which_ru);
1973: }
1974: pda = pda->next;
1975: }
1976:
1977: /* Initialize nodes which write new Q (Wnq). */
1978: if (nfaults == 2) {
1979: pda = asmap->qInfo;
1980: for (i = 0; i < numParityNodes; i++) {
1981: rf_InitNode(&writeQNodes[i], rf_wait, RF_FALSE,
1982: rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1983: rf_GenericWakeupFunc, 1, numParityNodes,
1984: 4, 0, dag_h, "Wnq", allocList);
1985: RF_ASSERT(pda != NULL);
1986: /* Param 1 (bufPtr) filled in by xor node. */
1987: writeQNodes[i].params[0].p = pda;
1988: /* Buffer pointer for parity write operation. */
1989: writeQNodes[i].params[1].p = qNodes[i].results[0];
1990: writeQNodes[i].params[2].v = parityStripeID;
1991: writeQNodes[i].params[3].v =
1992: RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
1993: 0, 0, which_ru);
1994:
1995: if (lu_flag) {
1996: /* Initialize node to unlock the disk queue. */
1997: rf_InitNode(&unlockQNodes[i], rf_wait,
1998: RF_FALSE, rf_DiskUnlockFunc,
1999: rf_DiskUnlockUndoFunc,
2000: rf_GenericWakeupFunc, 1, 1, 2, 0,
2001: dag_h, "Unq", allocList);
2002: /* Physical disk addr desc. */
2003: unlockQNodes[i].params[0].p = pda;
2004: unlockQNodes[i].params[1].v =
2005: RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
2006: 0, lu_flag, which_ru);
2007: }
2008: pda = pda->next;
2009: }
2010: }
2011: /* Step 4. Connect the nodes. */
2012:
2013: /* Connect header to block node. */
2014: dag_h->succedents[0] = blockNode;
2015:
2016: /* Connect block node to read old data nodes. */
2017: RF_ASSERT(blockNode->numSuccedents ==
2018: (numDataNodes + (numParityNodes * nfaults)));
2019: for (i = 0; i < numDataNodes; i++) {
2020: blockNode->succedents[i] = &readDataNodes[i];
2021: RF_ASSERT(readDataNodes[i].numAntecedents == 1);
2022: readDataNodes[i].antecedents[0] = blockNode;
2023: readDataNodes[i].antType[0] = rf_control;
2024: }
2025:
2026: /* Connect block node to read old parity nodes. */
2027: for (i = 0; i < numParityNodes; i++) {
2028: blockNode->succedents[numDataNodes + i] = &readParityNodes[i];
2029: RF_ASSERT(readParityNodes[i].numAntecedents == 1);
2030: readParityNodes[i].antecedents[0] = blockNode;
2031: readParityNodes[i].antType[0] = rf_control;
2032: }
2033:
2034: /* Connect block node to read old Q nodes. */
2035: if (nfaults == 2)
2036: for (i = 0; i < numParityNodes; i++) {
2037: blockNode->succedents[numDataNodes +
2038: numParityNodes + i] = &readQNodes[i];
2039: RF_ASSERT(readQNodes[i].numAntecedents == 1);
2040: readQNodes[i].antecedents[0] = blockNode;
2041: readQNodes[i].antType[0] = rf_control;
2042: }
2043:
2044: /* Connect read old data nodes to write new data nodes. */
2045: for (i = 0; i < numDataNodes; i++) {
2046: RF_ASSERT(readDataNodes[i].numSuccedents ==
2047: ((nfaults * numParityNodes) + 1));
2048: RF_ASSERT(writeDataNodes[i].numAntecedents == 1);
2049: readDataNodes[i].succedents[0] = &writeDataNodes[i];
2050: writeDataNodes[i].antecedents[0] = &readDataNodes[i];
2051: writeDataNodes[i].antType[0] = rf_antiData;
2052: }
2053:
2054: /* Connect read old data nodes to xor nodes. */
2055: for (i = 0; i < numDataNodes; i++) {
2056: for (j = 0; j < numParityNodes; j++) {
2057: RF_ASSERT(xorNodes[j].numAntecedents ==
2058: numDataNodes + numParityNodes);
2059: readDataNodes[i].succedents[1 + j] = &xorNodes[j];
2060: xorNodes[j].antecedents[i] = &readDataNodes[i];
2061: xorNodes[j].antType[i] = rf_trueData;
2062: }
2063: }
2064:
2065: /* Connect read old data nodes to q nodes. */
2066: if (nfaults == 2)
2067: for (i = 0; i < numDataNodes; i++)
2068: for (j = 0; j < numParityNodes; j++) {
2069: RF_ASSERT(qNodes[j].numAntecedents ==
2070: numDataNodes + numParityNodes);
2071: readDataNodes[i].succedents
2072: [1 + numParityNodes + j] = &qNodes[j];
2073: qNodes[j].antecedents[i] = &readDataNodes[i];
2074: qNodes[j].antType[i] = rf_trueData;
2075: }
2076:
2077: /* Connect read old parity nodes to xor nodes. */
2078: for (i = 0; i < numParityNodes; i++) {
2079: for (j = 0; j < numParityNodes; j++) {
2080: RF_ASSERT(readParityNodes[i].numSuccedents ==
2081: numParityNodes);
2082: readParityNodes[i].succedents[j] = &xorNodes[j];
2083: xorNodes[j].antecedents[numDataNodes + i] =
2084: &readParityNodes[i];
2085: xorNodes[j].antType[numDataNodes + i] = rf_trueData;
2086: }
2087: }
2088:
2089: /* Connect read old q nodes to q nodes. */
2090: if (nfaults == 2)
2091: for (i = 0; i < numParityNodes; i++) {
2092: for (j = 0; j < numParityNodes; j++) {
2093: RF_ASSERT(readQNodes[i].numSuccedents ==
2094: numParityNodes);
2095: readQNodes[i].succedents[j] = &qNodes[j];
2096: qNodes[j].antecedents[numDataNodes + i] =
2097: &readQNodes[i];
2098: qNodes[j].antType[numDataNodes + i] =
2099: rf_trueData;
2100: }
2101: }
2102:
2103: /* Connect xor nodes to the write new parity nodes. */
2104: for (i = 0; i < numParityNodes; i++) {
2105: RF_ASSERT(writeParityNodes[i].numAntecedents == numParityNodes);
2106: for (j = 0; j < numParityNodes; j++) {
2107: RF_ASSERT(xorNodes[j].numSuccedents == numParityNodes);
2108: xorNodes[i].succedents[j] = &writeParityNodes[j];
2109: writeParityNodes[j].antecedents[i] = &xorNodes[i];
2110: writeParityNodes[j].antType[i] = rf_trueData;
2111: }
2112: }
2113:
2114: /* Connect q nodes to the write new q nodes. */
2115: if (nfaults == 2)
2116: for (i = 0; i < numParityNodes; i++) {
2117: RF_ASSERT(writeQNodes[i].numAntecedents ==
2118: numParityNodes);
2119: for (j = 0; j < numParityNodes; j++) {
2120: RF_ASSERT(qNodes[j].numSuccedents == 1);
2121: qNodes[i].succedents[j] = &writeQNodes[j];
2122: writeQNodes[j].antecedents[i] = &qNodes[i];
2123: writeQNodes[j].antType[i] = rf_trueData;
2124: }
2125: }
2126:
2127: RF_ASSERT(termNode->numAntecedents ==
2128: (numDataNodes + (nfaults * numParityNodes)));
2129: RF_ASSERT(termNode->numSuccedents == 0);
2130: for (i = 0; i < numDataNodes; i++) {
2131: if (lu_flag) {
2132: /* Connect write new data nodes to unlock nodes. */
2133: RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
2134: RF_ASSERT(unlockDataNodes[i].numAntecedents == 1);
2135: writeDataNodes[i].succedents[0] = &unlockDataNodes[i];
2136: unlockDataNodes[i].antecedents[0] = &writeDataNodes[i];
2137: unlockDataNodes[i].antType[0] = rf_control;
2138:
2139: /* Connect unlock nodes to term nodes. */
2140: RF_ASSERT(unlockDataNodes[i].numSuccedents == 1);
2141: unlockDataNodes[i].succedents[0] = termNode;
2142: termNode->antecedents[i] = &unlockDataNodes[i];
2143: termNode->antType[i] = rf_control;
2144: } else {
2145: /* Connect write new data nodes to term node. */
2146: RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
2147: RF_ASSERT(termNode->numAntecedents ==
2148: (numDataNodes + (nfaults * numParityNodes)));
2149: writeDataNodes[i].succedents[0] = termNode;
2150: termNode->antecedents[i] = &writeDataNodes[i];
2151: termNode->antType[i] = rf_control;
2152: }
2153: }
2154:
2155: for (i = 0; i < numParityNodes; i++) {
2156: if (lu_flag) {
2157: /* Connect write new parity nodes to unlock nodes. */
2158: RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
2159: RF_ASSERT(unlockParityNodes[i].numAntecedents == 1);
2160: writeParityNodes[i].succedents[0] =
2161: &unlockParityNodes[i];
2162: unlockParityNodes[i].antecedents[0] =
2163: &writeParityNodes[i];
2164: unlockParityNodes[i].antType[0] = rf_control;
2165:
2166: /* Connect unlock nodes to term node. */
2167: RF_ASSERT(unlockParityNodes[i].numSuccedents == 1);
2168: unlockParityNodes[i].succedents[0] = termNode;
2169: termNode->antecedents[numDataNodes + i] =
2170: &unlockParityNodes[i];
2171: termNode->antType[numDataNodes + i] = rf_control;
2172: } else {
2173: RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
2174: writeParityNodes[i].succedents[0] = termNode;
2175: termNode->antecedents[numDataNodes + i] =
2176: &writeParityNodes[i];
2177: termNode->antType[numDataNodes + i] = rf_control;
2178: }
2179: }
2180:
2181: if (nfaults == 2)
2182: for (i = 0; i < numParityNodes; i++) {
2183: if (lu_flag) {
2184: /* Connect write new Q nodes to unlock nodes. */
2185: RF_ASSERT(writeQNodes[i].numSuccedents == 1);
2186: RF_ASSERT(unlockQNodes[i].numAntecedents == 1);
2187: writeQNodes[i].succedents[0] = &unlockQNodes[i];
2188: unlockQNodes[i].antecedents[0] =
2189: &writeQNodes[i];
2190: unlockQNodes[i].antType[0] = rf_control;
2191:
2192: /* Connect unlock nodes to unblock node. */
2193: RF_ASSERT(unlockQNodes[i].numSuccedents == 1);
2194: unlockQNodes[i].succedents[0] = termNode;
2195: termNode->antecedents[numDataNodes +
2196: numParityNodes + i] = &unlockQNodes[i];
2197: termNode->antType[numDataNodes +
2198: numParityNodes + i] = rf_control;
2199: } else {
2200: RF_ASSERT(writeQNodes[i].numSuccedents == 1);
2201: writeQNodes[i].succedents[0] = termNode;
2202: termNode->antecedents[numDataNodes +
2203: numParityNodes + i] = &writeQNodes[i];
2204: termNode->antType[numDataNodes +
2205: numParityNodes + i] = rf_control;
2206: }
2207: }
2208: }
2209:
2210:
2211:
2212: /*****************************************************************************
2213: * Create a write graph (fault-free or degraded) for RAID level 1.
2214: *
2215: * Hdr Nil -> Wpd -> Nil -> Trm
2216: * Nil -> Wsd ->
2217: *
2218: * The "Wpd" node writes data to the primary copy in the mirror pair.
2219: * The "Wsd" node writes data to the secondary copy in the mirror pair.
2220: *
2221: * Parameters: raidPtr - description of the physical array
2222: * asmap - logical & physical addresses for this access
2223: * bp - buffer ptr (holds write data)
2224: * flags - general flags (e.g. disk locking)
2225: * allocList - list of memory allocated in DAG creation
2226: *****************************************************************************/
2227:
2228: void
2229: rf_CreateRaidOneWriteDAGFwd(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
2230: RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
2231: RF_AllocListElem_t *allocList)
2232: {
2233: RF_DagNode_t *blockNode, *unblockNode, *termNode;
2234: RF_DagNode_t *nodes, *wndNode, *wmirNode;
2235: int nWndNodes, nWmirNodes, i;
2236: RF_ReconUnitNum_t which_ru;
2237: RF_PhysDiskAddr_t *pda, *pdaP;
2238: RF_StripeNum_t parityStripeID;
2239:
2240: parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
2241: asmap->raidAddress, &which_ru);
2242: if (rf_dagDebug) {
2243: printf("[Creating RAID level 1 write DAG]\n");
2244: }
2245: /* 2 implies access not SU aligned. */
2246: nWmirNodes = (asmap->parityInfo->next) ? 2 : 1;
2247: nWndNodes = (asmap->physInfo->next) ? 2 : 1;
2248:
2249: /* Alloc the Wnd nodes and the Wmir node. */
2250: if (asmap->numDataFailed == 1)
2251: nWndNodes--;
2252: if (asmap->numParityFailed == 1)
2253: nWmirNodes--;
2254:
2255: /*
2256: * Total number of nodes = nWndNodes + nWmirNodes +
2257: * (block + unblock + terminator)
2258: */
2259: RF_CallocAndAdd(nodes, nWndNodes + nWmirNodes + 3,
2260: sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
2261: i = 0;
2262: wndNode = &nodes[i];
2263: i += nWndNodes;
2264: wmirNode = &nodes[i];
2265: i += nWmirNodes;
2266: blockNode = &nodes[i];
2267: i += 1;
2268: unblockNode = &nodes[i];
2269: i += 1;
2270: termNode = &nodes[i];
2271: i += 1;
2272: RF_ASSERT(i == (nWndNodes + nWmirNodes + 3));
2273:
2274: /* This dag can commit immediately. */
2275: dag_h->numCommitNodes = 0;
2276: dag_h->numCommits = 0;
2277: dag_h->numSuccedents = 1;
2278:
2279: /* Initialize the unblock and term nodes. */
2280: rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
2281: rf_NullNodeUndoFunc, NULL, (nWndNodes + nWmirNodes),
2282: 0, 0, 0, dag_h, "Nil", allocList);
2283: rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
2284: rf_NullNodeUndoFunc, NULL, 1, (nWndNodes + nWmirNodes),
2285: 0, 0, dag_h, "Nil", allocList);
2286: rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc,
2287: rf_TerminateUndoFunc, NULL, 0, 1, 0, 0, dag_h, "Trm", allocList);
2288:
2289: /* Initialize the wnd nodes. */
2290: if (nWndNodes > 0) {
2291: pda = asmap->physInfo;
2292: for (i = 0; i < nWndNodes; i++) {
2293: rf_InitNode(&wndNode[i], rf_wait, RF_FALSE,
2294: rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
2295: rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
2296: "Wpd", allocList);
2297: RF_ASSERT(pda != NULL);
2298: wndNode[i].params[0].p = pda;
2299: wndNode[i].params[1].p = pda->bufPtr;
2300: wndNode[i].params[2].v = parityStripeID;
2301: wndNode[i].params[3].v =
2302: RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
2303: 0, 0, which_ru);
2304: pda = pda->next;
2305: }
2306: RF_ASSERT(pda == NULL);
2307: }
2308: /* Initialize the mirror nodes. */
2309: if (nWmirNodes > 0) {
2310: pda = asmap->physInfo;
2311: pdaP = asmap->parityInfo;
2312: for (i = 0; i < nWmirNodes; i++) {
2313: rf_InitNode(&wmirNode[i], rf_wait, RF_FALSE,
2314: rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
2315: rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
2316: "Wsd", allocList);
2317: RF_ASSERT(pda != NULL);
2318: wmirNode[i].params[0].p = pdaP;
2319: wmirNode[i].params[1].p = pda->bufPtr;
2320: wmirNode[i].params[2].v = parityStripeID;
2321: wmirNode[i].params[3].v =
2322: RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
2323: 0, 0, which_ru);
2324: pda = pda->next;
2325: pdaP = pdaP->next;
2326: }
2327: RF_ASSERT(pda == NULL);
2328: RF_ASSERT(pdaP == NULL);
2329: }
2330: /* Link the header node to the block node. */
2331: RF_ASSERT(dag_h->numSuccedents == 1);
2332: RF_ASSERT(blockNode->numAntecedents == 0);
2333: dag_h->succedents[0] = blockNode;
2334:
2335: /* Link the block node to the write nodes. */
2336: RF_ASSERT(blockNode->numSuccedents == (nWndNodes + nWmirNodes));
2337: for (i = 0; i < nWndNodes; i++) {
2338: RF_ASSERT(wndNode[i].numAntecedents == 1);
2339: blockNode->succedents[i] = &wndNode[i];
2340: wndNode[i].antecedents[0] = blockNode;
2341: wndNode[i].antType[0] = rf_control;
2342: }
2343: for (i = 0; i < nWmirNodes; i++) {
2344: RF_ASSERT(wmirNode[i].numAntecedents == 1);
2345: blockNode->succedents[i + nWndNodes] = &wmirNode[i];
2346: wmirNode[i].antecedents[0] = blockNode;
2347: wmirNode[i].antType[0] = rf_control;
2348: }
2349:
2350: /* Link the write nodes to the unblock node. */
2351: RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes));
2352: for (i = 0; i < nWndNodes; i++) {
2353: RF_ASSERT(wndNode[i].numSuccedents == 1);
2354: wndNode[i].succedents[0] = unblockNode;
2355: unblockNode->antecedents[i] = &wndNode[i];
2356: unblockNode->antType[i] = rf_control;
2357: }
2358: for (i = 0; i < nWmirNodes; i++) {
2359: RF_ASSERT(wmirNode[i].numSuccedents == 1);
2360: wmirNode[i].succedents[0] = unblockNode;
2361: unblockNode->antecedents[i + nWndNodes] = &wmirNode[i];
2362: unblockNode->antType[i + nWndNodes] = rf_control;
2363: }
2364:
2365: /* Link the unblock node to the term node. */
2366: RF_ASSERT(unblockNode->numSuccedents == 1);
2367: RF_ASSERT(termNode->numAntecedents == 1);
2368: RF_ASSERT(termNode->numSuccedents == 0);
2369: unblockNode->succedents[0] = termNode;
2370: termNode->antecedents[0] = unblockNode;
2371: termNode->antType[0] = rf_control;
2372:
2373: return;
2374: }
CVSweb