#define bufTrigger 3 #define bufFactor 2 #define wrap 13 #define queueSize 2 mtype = {data,cp}; typedef dataMsg { mtype type; byte cpId; byte cpSent; byte cpRecd; } /* checkpoints for endpoints */ byte ackedId[2]; byte ackedSent[2]; byte ackedRecd[2]; byte pendId[2]; byte pendSent[2]; byte pendRecd[2]; byte ready[2]; init { /* channels */ chan point1Recv = [queueSize] of {dataMsg}; chan point2Recv = [queueSize] of {dataMsg}; /* start processes */ run Endpoint(point1Recv, point2Recv, 0); run Endpoint(point2Recv, point1Recv, 1); } proctype Endpoint(chan recv, send; bit id) { /* send and receive counters */ byte sent = 0; byte recd = 0; /* checkpoints */ ackedId[id] = 0; ackedSent[id] = 0; ackedRecd[id] = 0; pendId[id] = 1; pendSent[id] = 0; pendRecd[id] = 0; ready[id] = true; /* indicate whether a reply is expected */ bool recvCp = false; /* messages */ dataMsg recvMsg; dataMsg sendDataMsg; sendDataMsg.type = data; sendDataMsg.cpId = 0; sendDataMsg.cpSent = 0; sendDataMsg.cpRecd = 0; dataMsg sendCpMsg; sendCpMsg.type = cp; ACTIVE: assert(sent <= bufTrigger*bufFactor && recd <= bufTrigger*bufFactor); if :: sent == bufTrigger*bufFactor && recvCp == true; recv?recvMsg; goto ACTIVE_RECV; :: sent == bufTrigger*bufFactor && recvCp == false; /* create checkpoint */ d_step { /* move the reference points */ sent = sent - pendSent[id]; recd = recd - pendRecd[id]; /* overwrite old acked and update absolute counters */ ackedId[id] = pendId[id]; ackedSent[id] = ((ackedSent[id] + pendSent[id]) % wrap); ackedRecd[id] = ((ackedRecd[id] + pendRecd[id]) % wrap); /* new pend */ pendId[id] = (pendId[id] + 1) % 3; pendSent[id] = sent; pendRecd[id] = recd; /* prepare message */ sendCpMsg.cpId = pendId[id]; sendCpMsg.cpSent = pendSent[id]; sendCpMsg.cpRecd = pendRecd[id]; }; /* send cp request */ send!sendCpMsg; recvCp = true; goto ACTIVE; :: else; if :: recv?recvMsg; goto ACTIVE_RECV; :: send!sendDataMsg; /* sent data */ sent++; if :: sent >= bufTrigger && recvCp == false; /* create checkpoint */ d_step { /* move the reference points */ sent = sent - pendSent[id]; recd = recd - pendRecd[id]; /* overwrite old acked and update absolute counters */ ackedId[id] = pendId[id]; ackedSent[id] = ((ackedSent[id] + pendSent[id]) % wrap); ackedRecd[id] = ((ackedRecd[id] + pendRecd[id]) % wrap); /* new pend */ pendId[id] = (pendId[id] + 1) % 3; pendSent[id] = sent; pendRecd[id] = recd; /* prepare message */ sendCpMsg.cpId = pendId[id]; sendCpMsg.cpSent = pendSent[id]; sendCpMsg.cpRecd = pendRecd[id]; }; /* send cp request */ send!sendCpMsg; recvCp = true; goto ACTIVE; :: else; goto ACTIVE; fi; fi; fi; ACTIVE_RECV: if :: recvMsg.type == data; /* received data */ recd++; goto ACTIVE; :: recvMsg.type == cp && recvCp == true && recvMsg.cpId == pendId[id]; /* this is a reply, take maximum */ d_step { if :: recvMsg.cpSent > pendRecd[id]; pendRecd[id] = recvMsg.cpSent; :: else; fi; recvCp = false; }; goto ACTIVE; :: recvMsg.type == cp && recvCp == false && recvMsg.cpId == (pendId[id] + 1) % 3; /* this is a request, update */ d_step { sent = sent - pendSent[id]; recd = recd - pendRecd[id]; /* overwrite old acked and update absolute counters */ ackedId[id] = pendId[id]; ackedSent[id] = ((ackedSent[id] + pendSent[id]) % wrap); ackedRecd[id] = ((ackedRecd[id] + pendRecd[id]) % wrap); /* new pend */ pendId[id] = recvMsg.cpId; pendSent[id] = recvMsg.cpRecd; pendRecd[id] = recvMsg.cpSent; /* prepare message */ sendCpMsg.cpId = pendId[id]; sendCpMsg.cpSent = pendSent[id]; sendCpMsg.cpRecd = pendRecd[id]; }; /* send cp reply */ send!sendCpMsg; goto ACTIVE; :: else; assert(false); fi; }