ack/lang/m2/libm2/CSP.mod

344 lines
6.5 KiB
Modula-2
Raw Normal View History

IMPLEMENTATION MODULE CSP;
(* From
"A Modula-2 Implementation of CSP",
M. Collado, R. Morales, J.J. Moreno,
SIGPlan Notices, Volume 22, Number 6, June 1987.
See this article for an explanation of the use of this module.
*)
FROM random IMPORT Uniform;
FROM SYSTEM IMPORT BYTE, TSIZE, ADDRESS, ADR, NEWPROCESS, TRANSFER;
FROM Storage IMPORT ALLOCATE, DEALLOCATE;
IMPORT Traps;
CONST WorkSpaceSize = 1000;
TYPE ByteAddress = POINTER TO BYTE;
Channel = POINTER TO ChannelDescriptor;
ProcessType = POINTER TO ProcessDescriptor;
ProcessDescriptor = RECORD
next: ProcessType;
father: ProcessType;
cor: ADDRESS;
wsp: ADDRESS;
guardindex: INTEGER;
guardno: CARDINAL;
guardcount: CARDINAL;
opened: Channel;
sons: CARDINAL;
msgadr: ADDRESS;
msglen: CARDINAL;
END;
Queue = RECORD
head, tail: ProcessType;
END;
ChannelDescriptor = RECORD
senders: Queue;
owner: ProcessType;
guardindex: INTEGER;
next: Channel;
END;
VAR cp: ProcessType;
free, ready: Queue;
(* ------------ Private modules and procedures ------------- *)
MODULE ProcessQueue;
IMPORT ProcessType, Queue;
EXPORT Push, Pop, InitQueue, IsEmpty;
PROCEDURE InitQueue(VAR q: Queue);
BEGIN
WITH q DO
head := NIL;
tail := NIL
END
END InitQueue;
PROCEDURE Push(p: ProcessType; VAR q: Queue);
BEGIN
p^.next := NIL;
WITH q DO
IF head = NIL THEN
tail := p
ELSE
head^.next := p
END;
head := p
END
END Push;
PROCEDURE Pop(VAR q: Queue; VAR p: ProcessType);
BEGIN
WITH q DO
p := tail;
IF p # NIL THEN
tail := tail^.next;
IF head = p THEN
head := NIL
END
END
END
END Pop;
PROCEDURE IsEmpty(q: Queue): BOOLEAN;
BEGIN
RETURN q.head = NIL
END IsEmpty;
END ProcessQueue;
PROCEDURE DoTransfer;
VAR aux: ProcessType;
BEGIN
aux := cp;
Pop(ready, cp);
IF cp = NIL THEN
HALT
ELSE
TRANSFER(aux^.cor, cp^.cor)
END
END DoTransfer;
PROCEDURE OpenChannel(ch: Channel; n: INTEGER);
BEGIN
WITH ch^ DO
IF guardindex = 0 THEN
guardindex := n;
next := cp^.opened;
cp^.opened := ch
END
END
END OpenChannel;
PROCEDURE CloseChannels(p: ProcessType);
BEGIN
WITH p^ DO
WHILE opened # NIL DO
opened^.guardindex := 0;
opened := opened^.next
END
END
END CloseChannels;
PROCEDURE ThereAreOpenChannels(): BOOLEAN;
BEGIN
RETURN cp^.opened # NIL;
END ThereAreOpenChannels;
PROCEDURE Sending(ch: Channel): BOOLEAN;
BEGIN
RETURN NOT IsEmpty(ch^.senders)
END Sending;
(* -------------- Public Procedures ----------------- *)
PROCEDURE COBEGIN;
(* Beginning of a COBEGIN .. COEND structure *)
BEGIN
END COBEGIN;
PROCEDURE COEND;
(* End of a COBEGIN .. COEND structure *)
VAR aux: ProcessType;
BEGIN
IF cp^.sons > 0 THEN
DoTransfer
END
END COEND;
PROCEDURE StartProcess(P: PROC);
(* Start an anonimous process that executes the procedure P *)
VAR newprocess: ProcessType;
BEGIN
Pop(free, newprocess);
IF newprocess = NIL THEN
NEW(newprocess);
ALLOCATE(newprocess^.wsp, WorkSpaceSize)
END;
WITH newprocess^ DO
father := cp;
sons := 0;
msglen := 0;
NEWPROCESS(P, wsp, WorkSpaceSize, cor)
END;
cp^.sons := cp^.sons + 1;
Push(newprocess, ready)
END StartProcess;
PROCEDURE StopProcess;
(* Terminate a Process (itself) *)
VAR aux: ProcessType;
BEGIN
aux := cp^.father;
aux^.sons := aux^.sons - 1;
IF aux^.sons = 0 THEN
Push(aux, ready)
END;
aux := cp;
Push(aux, free);
Pop(ready, cp);
IF cp = NIL THEN
HALT
ELSE
TRANSFER(aux^.cor, cp^.cor)
END
END StopProcess;
PROCEDURE InitChannel(VAR ch: Channel);
(* Initialize the channel ch *)
BEGIN
NEW(ch);
WITH ch^ DO
InitQueue(senders);
owner := NIL;
next := NIL;
guardindex := 0
END
END InitChannel;
PROCEDURE GetChannel(ch: Channel);
(* Assign the channel ch to the process that gets it *)
BEGIN
WITH ch^ DO
IF owner # NIL THEN
Traps.Message("Channel already has an owner");
HALT
END;
owner := cp
END
END GetChannel;
PROCEDURE Send(data: ARRAY OF BYTE; VAR ch: Channel);
(* Send a message with the data to the cvhannel ch *)
VAR m: ByteAddress;
aux: ProcessType;
i: CARDINAL;
BEGIN
WITH ch^ DO
Push(cp, senders);
ALLOCATE(cp^.msgadr, SIZE(data));
m := cp^.msgadr;
cp^.msglen := HIGH(data);
FOR i := 0 TO HIGH(data) DO
m^ := data[i];
m := ADDRESS(m) + 1
END;
IF guardindex # 0 THEN
owner^.guardindex := guardindex;
CloseChannels(owner);
Push(owner, ready)
END
END;
DoTransfer
END Send;
PROCEDURE Receive(VAR ch: Channel; VAR dest: ARRAY OF BYTE);
(* Receive a message from the channel ch into the dest variable *)
VAR aux: ProcessType;
m: ByteAddress;
i: CARDINAL;
BEGIN
WITH ch^ DO
IF cp # owner THEN
Traps.Message("Only owner of channel can receive from it");
HALT
END;
IF Sending(ch) THEN
Pop(senders, aux);
m := aux^.msgadr;
FOR i := 0 TO aux^.msglen DO
dest[i] := m^;
m := ADDRESS(m) + 1
END;
Push(aux, ready);
Push(cp, ready);
CloseChannels(cp)
ELSE
OpenChannel(ch, -1);
DoTransfer;
Pop(senders, aux);
m := aux^.msgadr;
FOR i := 0 TO aux^.msglen DO
dest[i] := m^;
m := ADDRESS(m) + 1
END;
Push(cp, ready);
Push(aux, ready)
END;
DEALLOCATE(aux^.msgadr, aux^.msglen+1);
DoTransfer
END
END Receive;
PROCEDURE SELECT(n: CARDINAL);
(* Beginning of a SELECT structure with n guards *)
BEGIN
cp^.guardindex := Uniform(1,n);
cp^.guardno := n;
cp^.guardcount := n
END SELECT;
PROCEDURE NEXTGUARD(): CARDINAL;
(* Returns an index to the next guard to be evaluated in a SELECT *)
BEGIN
RETURN cp^.guardindex
END NEXTGUARD;
PROCEDURE GUARD(cond: BOOLEAN; ch: Channel;
VAR dest: ARRAY OF BYTE): BOOLEAN;
(* Evaluates a guard, including reception management *)
VAR aux: ProcessType;
BEGIN
IF NOT cond THEN
RETURN FALSE
ELSIF ch = NIL THEN
CloseChannels(cp);
cp^.guardindex := 0;
RETURN TRUE
ELSIF Sending(ch) THEN
Receive(ch, dest);
cp^.guardindex := 0;
RETURN TRUE
ELSE
OpenChannel(ch, cp^.guardindex);
RETURN FALSE
END
END GUARD;
PROCEDURE ENDSELECT(): BOOLEAN;
(* End of a SELECT structure *)
BEGIN
WITH cp^ DO
IF guardindex <= 0 THEN
RETURN TRUE
END;
guardcount := guardcount - 1;
IF guardcount # 0 THEN
guardindex := (guardindex MOD INTEGER(guardno)) + 1
ELSIF ThereAreOpenChannels() THEN
DoTransfer
ELSE
guardindex := 0
END
END;
RETURN FALSE
END ENDSELECT;
BEGIN
InitQueue(free);
InitQueue(ready);
NEW(cp);
WITH cp^ DO
sons := 0;
father := NIL
END
END CSP.