#define bufTrigger 3 #define bufFactor 2 #define wrap 13 #define queueSize 2 mtype = {data,cp}; typedef dataMsg { mtype type; byte cpId; byte cpPos; } /* checkpoints for endpoints */ byte ackedId[2]; byte ackedSent[2]; byte ackedRecd[2]; byte pendId[2]; byte pendSent[2]; byte pendRecd[2]; byte ready[2]; init { /* channels */ chan point1Recv = [queueSize] of {dataMsg}; chan point2Recv = [queueSize] of {dataMsg}; /* start processes */ run Initiator(point1Recv, point2Recv); run Noninitiator(point2Recv, point1Recv); } proctype Initiator(chan recv, send) { /* send and receive counters */ byte sent = 0; byte recd = 0; /* checkpoints */ ackedId[0] = 0; ackedSent[0] = 0; ackedRecd[0] = 0; pendId[0] = 1; pendSent[0] = 0; pendRecd[0] = 0; ready[0] = true; /* indicate whether a reply is expected */ bool recvCpAck = false; /* messages */ dataMsg recvMsg; dataMsg sendDataMsg; sendDataMsg.type = data; sendDataMsg.cpId = 0; sendDataMsg.cpPos = 0; dataMsg sendCpMsg; sendCpMsg.type = cp; ACTIVE: assert(sent <= bufTrigger*bufFactor && recd <= bufTrigger*bufFactor); if :: (sent == bufTrigger*bufFactor || recd == bufTrigger*bufFactor) && recvCpAck == true; recv?recvMsg; goto ACTIVE_RECV; :: (sent == bufTrigger*bufFactor || recd == bufTrigger*bufFactor) && recvCpAck == false; /* create checkpoint */ d_step { /* move the reference points */ sent = sent - pendSent[0]; recd = recd - pendRecd[0]; /* overwrite old acked and update absolute counters */ ackedId[0] = pendId[0]; ackedSent[0] = ((ackedSent[0] + pendSent[0]) % wrap); ackedRecd[0] = ((ackedRecd[0] + pendRecd[0]) % wrap); /* new pend */ pendId[0] = (pendId[0] + 1) % 3; pendSent[0] = sent; pendRecd[0] = recd; /* prepare message */ sendCpMsg.cpId = pendId[0]; sendCpMsg.cpPos = pendSent[0]; }; /* send checkpoint req */ send!sendCpMsg; recvCpAck = true; goto ACTIVE; :: else; if :: recv?recvMsg; goto ACTIVE_RECV; :: send!sendDataMsg; /* sent data */ sent++; if :: sent >= bufTrigger && recvCpAck == false; /* create checkpoint */ d_step { /* move the reference points */ sent = sent - pendSent[0]; recd = recd - pendRecd[0]; /* overwrite old acked and update absolute counters */ ackedId[0] = pendId[0]; ackedSent[0] = ((ackedSent[0] + pendSent[0]) % wrap); ackedRecd[0] = ((ackedRecd[0] + pendRecd[0]) % wrap); /* new pend */ pendId[0] = (pendId[0] + 1) % 3; pendSent[0] = sent; pendRecd[0] = recd; /* prepare message */ sendCpMsg.cpId = pendId[0]; sendCpMsg.cpPos = pendSent[0]; }; /* send checkpoint req */ send!sendCpMsg; recvCpAck = true; goto ACTIVE; :: else; goto ACTIVE; fi; fi; fi; ACTIVE_RECV: if :: recvMsg.type == data; /* received data */ recd++; if :: recd >= bufTrigger && recvCpAck == false; /* create checkpoint */ d_step { /* move the reference points */ sent = sent - pendSent[0]; recd = recd - pendRecd[0]; /* overwrite old acked and update absolute counters */ ackedId[0] = pendId[0]; ackedSent[0] = ((ackedSent[0] + pendSent[0]) % wrap); ackedRecd[0] = ((ackedRecd[0] + pendRecd[0]) % wrap); /* new pend */ pendId[0] = (pendId[0] + 1) % 3; pendSent[0] = sent; pendRecd[0] = recd; /* prepare message */ sendCpMsg.cpId = pendId[0]; sendCpMsg.cpPos = pendSent[0]; }; /* send checkpoint req */ send!sendCpMsg; recvCpAck = true; goto ACTIVE; :: else; goto ACTIVE; fi; :: recvMsg.type == cp && recvCpAck == true && recvMsg.cpId == pendId[0]; /* this is a reply */ d_step { pendRecd[0] = recvMsg.cpPos; recvCpAck = false; }; goto ACTIVE; :: else; assert(false); fi; } proctype Noninitiator(chan recv, send) { /* send and receive counters */ byte sent = 0; byte recd = 0; /* checkpoints */ ackedId[1] = 0; ackedSent[1] = 0; ackedRecd[1] = 0; pendId[1] = 1; pendSent[1] = 0; pendRecd[1] = 0; ready[1] = true; /* messages */ dataMsg recvMsg; dataMsg sendDataMsg; sendDataMsg.type = data; sendDataMsg.cpId = 0; sendDataMsg.cpPos = 0; dataMsg sendCpMsg; sendCpMsg.type = cp; ACTIVE: assert(sent <= bufTrigger*bufFactor && recd <= bufTrigger*bufFactor); if :: sent == bufTrigger*bufFactor || recd == bufTrigger*bufFactor; recv?recvMsg; goto ACTIVE_RECV; :: else; if :: recv?recvMsg; goto ACTIVE_RECV; :: send!sendDataMsg; /* sent data */ sent++; goto ACTIVE; fi; fi; ACTIVE_RECV: if :: recvMsg.type == data; /* received data */ recd++; goto ACTIVE; :: recvMsg.type == cp && recvMsg.cpId == (pendId[1] + 1) % 3; /* this is a request, update */ d_step { sent = sent - pendSent[1]; recd = recd - pendRecd[1]; /* overwrite old acked and update absolute counters */ ackedId[1] = pendId[1]; ackedSent[1] = ((ackedSent[1] + pendSent[1]) % wrap); ackedRecd[1] = ((ackedRecd[1] + pendRecd[1]) % wrap); /* new pend */ pendId[1] = recvMsg.cpId; pendSent[1] = sent; pendRecd[1] = recvMsg.cpPos; /* prepare message */ sendCpMsg.cpId = pendId[1]; sendCpMsg.cpPos = pendSent[1]; }; /* send checkpoint ack */ send!sendCpMsg; goto ACTIVE; :: else; assert(false); fi; }