347 lines
6.7 KiB
Modula-2
347 lines
6.7 KiB
Modula-2
(*$R-*)
|
|
IMPLEMENTATION MODULE CSP;
|
|
(*
|
|
Module: Communicating Sequential Processes
|
|
From: "A Modula-2 Implementation of CSP",
|
|
M. Collado, R. Morales, J.J. Moreno,
|
|
SIGPlan Notices, Volume 22, Number 6, June 1987.
|
|
Some modifications by Ceriel J.H. Jacobs
|
|
Version: $Id$
|
|
|
|
See this article for an explanation of the use of this module.
|
|
*)
|
|
|
|
FROM random IMPORT Uniform;
|
|
FROM SYSTEM IMPORT BYTE, ADDRESS, NEWPROCESS, TRANSFER;
|
|
FROM Storage IMPORT Allocate, Deallocate;
|
|
FROM Traps IMPORT Message;
|
|
|
|
CONST WorkSpaceSize = 2000;
|
|
|
|
TYPE ByteAddress = POINTER TO BYTE;
|
|
Channel = POINTER TO ChannelDescriptor;
|
|
ProcessType = POINTER TO ProcessDescriptor;
|
|
ProcessDescriptor = RECORD
|
|
next: ProcessType;
|
|
father: ProcessType;
|
|
cor: ADDRESS;
|
|
wsp: ADDRESS;
|
|
guardindex: INTEGER;
|
|
guardno: CARDINAL;
|
|
guardcount: CARDINAL;
|
|
opened: Channel;
|
|
sons: CARDINAL;
|
|
msgadr: ADDRESS;
|
|
msglen: CARDINAL;
|
|
END;
|
|
|
|
Queue = RECORD
|
|
head, tail: ProcessType;
|
|
END;
|
|
|
|
ChannelDescriptor = RECORD
|
|
senders: Queue;
|
|
owner: ProcessType;
|
|
guardindex: INTEGER;
|
|
next: Channel;
|
|
END;
|
|
|
|
VAR cp: ProcessType;
|
|
free, ready: Queue;
|
|
|
|
(* ------------ Private modules and procedures ------------- *)
|
|
|
|
MODULE ProcessQueue;
|
|
|
|
IMPORT ProcessType, Queue;
|
|
EXPORT Push, Pop, InitQueue, IsEmpty;
|
|
|
|
PROCEDURE InitQueue(VAR q: Queue);
|
|
BEGIN
|
|
WITH q DO
|
|
head := NIL;
|
|
tail := NIL
|
|
END
|
|
END InitQueue;
|
|
|
|
PROCEDURE Push(p: ProcessType; VAR q: Queue);
|
|
BEGIN
|
|
p^.next := NIL;
|
|
WITH q DO
|
|
IF head = NIL THEN
|
|
tail := p
|
|
ELSE
|
|
head^.next := p
|
|
END;
|
|
head := p
|
|
END
|
|
END Push;
|
|
|
|
PROCEDURE Pop(VAR q: Queue; VAR p: ProcessType);
|
|
BEGIN
|
|
WITH q DO
|
|
p := tail;
|
|
IF p # NIL THEN
|
|
tail := tail^.next;
|
|
IF head = p THEN
|
|
head := NIL
|
|
END
|
|
END
|
|
END
|
|
END Pop;
|
|
|
|
PROCEDURE IsEmpty(q: Queue): BOOLEAN;
|
|
BEGIN
|
|
RETURN q.head = NIL
|
|
END IsEmpty;
|
|
|
|
END ProcessQueue;
|
|
|
|
|
|
PROCEDURE DoTransfer;
|
|
VAR aux: ProcessType;
|
|
BEGIN
|
|
aux := cp;
|
|
Pop(ready, cp);
|
|
IF cp = NIL THEN
|
|
HALT
|
|
ELSE
|
|
TRANSFER(aux^.cor, cp^.cor)
|
|
END
|
|
END DoTransfer;
|
|
|
|
PROCEDURE OpenChannel(ch: Channel; n: INTEGER);
|
|
BEGIN
|
|
WITH ch^ DO
|
|
IF guardindex = 0 THEN
|
|
guardindex := n;
|
|
next := cp^.opened;
|
|
cp^.opened := ch
|
|
END
|
|
END
|
|
END OpenChannel;
|
|
|
|
PROCEDURE CloseChannels(p: ProcessType);
|
|
BEGIN
|
|
WITH p^ DO
|
|
WHILE opened # NIL DO
|
|
opened^.guardindex := 0;
|
|
opened := opened^.next
|
|
END
|
|
END
|
|
END CloseChannels;
|
|
|
|
PROCEDURE ThereAreOpenChannels(): BOOLEAN;
|
|
BEGIN
|
|
RETURN cp^.opened # NIL;
|
|
END ThereAreOpenChannels;
|
|
|
|
PROCEDURE Sending(ch: Channel): BOOLEAN;
|
|
BEGIN
|
|
RETURN NOT IsEmpty(ch^.senders)
|
|
END Sending;
|
|
|
|
(* -------------- Public Procedures ----------------- *)
|
|
|
|
PROCEDURE COBEGIN;
|
|
(* Beginning of a COBEGIN .. COEND structure *)
|
|
BEGIN
|
|
END COBEGIN;
|
|
|
|
PROCEDURE COEND;
|
|
(* End of a COBEGIN .. COEND structure *)
|
|
(* VAR aux: ProcessType; *)
|
|
BEGIN
|
|
IF cp^.sons > 0 THEN
|
|
DoTransfer
|
|
END
|
|
END COEND;
|
|
|
|
PROCEDURE StartProcess(P: PROC);
|
|
(* Start an anonimous process that executes the procedure P *)
|
|
VAR newprocess: ProcessType;
|
|
BEGIN
|
|
Pop(free, newprocess);
|
|
IF newprocess = NIL THEN
|
|
Allocate(newprocess,SIZE(ProcessDescriptor));
|
|
Allocate(newprocess^.wsp, WorkSpaceSize)
|
|
END;
|
|
WITH newprocess^ DO
|
|
father := cp;
|
|
sons := 0;
|
|
msglen := 0;
|
|
NEWPROCESS(P, wsp, WorkSpaceSize, cor)
|
|
END;
|
|
cp^.sons := cp^.sons + 1;
|
|
Push(newprocess, ready)
|
|
END StartProcess;
|
|
|
|
PROCEDURE StopProcess;
|
|
(* Terminate a Process (itself) *)
|
|
VAR aux: ProcessType;
|
|
BEGIN
|
|
aux := cp^.father;
|
|
aux^.sons := aux^.sons - 1;
|
|
IF aux^.sons = 0 THEN
|
|
Push(aux, ready)
|
|
END;
|
|
aux := cp;
|
|
Push(aux, free);
|
|
Pop(ready, cp);
|
|
IF cp = NIL THEN
|
|
HALT
|
|
ELSE
|
|
TRANSFER(aux^.cor, cp^.cor)
|
|
END
|
|
END StopProcess;
|
|
|
|
PROCEDURE InitChannel(VAR ch: Channel);
|
|
(* Initialize the channel ch *)
|
|
BEGIN
|
|
Allocate(ch, SIZE(ChannelDescriptor));
|
|
WITH ch^ DO
|
|
InitQueue(senders);
|
|
owner := NIL;
|
|
next := NIL;
|
|
guardindex := 0
|
|
END
|
|
END InitChannel;
|
|
|
|
PROCEDURE GetChannel(ch: Channel);
|
|
(* Assign the channel ch to the process that gets it *)
|
|
BEGIN
|
|
WITH ch^ DO
|
|
IF owner # NIL THEN
|
|
Message("Channel already has an owner");
|
|
HALT
|
|
END;
|
|
owner := cp
|
|
END
|
|
END GetChannel;
|
|
|
|
PROCEDURE Send(data: ARRAY OF BYTE; VAR ch: Channel);
|
|
(* Send a message with the data to the cvhannel ch *)
|
|
VAR m: ByteAddress;
|
|
(* aux: ProcessType; *)
|
|
i: CARDINAL;
|
|
BEGIN
|
|
WITH ch^ DO
|
|
Push(cp, senders);
|
|
Allocate(cp^.msgadr, SIZE(data));
|
|
m := cp^.msgadr;
|
|
cp^.msglen := HIGH(data);
|
|
FOR i := 0 TO HIGH(data) DO
|
|
m^ := data[i];
|
|
m := ADDRESS(m) + 1
|
|
END;
|
|
IF guardindex # 0 THEN
|
|
owner^.guardindex := guardindex;
|
|
CloseChannels(owner);
|
|
Push(owner, ready)
|
|
END
|
|
END;
|
|
DoTransfer
|
|
END Send;
|
|
|
|
PROCEDURE Receive(VAR ch: Channel; VAR dest: ARRAY OF BYTE);
|
|
(* Receive a message from the channel ch into the dest variable *)
|
|
VAR aux: ProcessType;
|
|
m: ByteAddress;
|
|
i: CARDINAL;
|
|
BEGIN
|
|
WITH ch^ DO
|
|
IF cp # owner THEN
|
|
Message("Only owner of channel can receive from it");
|
|
HALT
|
|
END;
|
|
IF Sending(ch) THEN
|
|
Pop(senders, aux);
|
|
m := aux^.msgadr;
|
|
FOR i := 0 TO aux^.msglen DO
|
|
dest[i] := m^;
|
|
m := ADDRESS(m) + 1
|
|
END;
|
|
Push(aux, ready);
|
|
Push(cp, ready);
|
|
CloseChannels(cp)
|
|
ELSE
|
|
OpenChannel(ch, -1);
|
|
DoTransfer;
|
|
Pop(senders, aux);
|
|
m := aux^.msgadr;
|
|
FOR i := 0 TO aux^.msglen DO
|
|
dest[i] := m^;
|
|
m := ADDRESS(m) + 1
|
|
END;
|
|
Push(cp, ready);
|
|
Push(aux, ready)
|
|
END;
|
|
Deallocate(aux^.msgadr, aux^.msglen+1);
|
|
DoTransfer
|
|
END
|
|
END Receive;
|
|
|
|
PROCEDURE SELECT(n: CARDINAL);
|
|
(* Beginning of a SELECT structure with n guards *)
|
|
BEGIN
|
|
cp^.guardindex := Uniform(1,n);
|
|
cp^.guardno := n;
|
|
cp^.guardcount := n
|
|
END SELECT;
|
|
|
|
PROCEDURE NEXTGUARD(): CARDINAL;
|
|
(* Returns an index to the next guard to be evaluated in a SELECT *)
|
|
BEGIN
|
|
RETURN cp^.guardindex
|
|
END NEXTGUARD;
|
|
|
|
PROCEDURE GUARD(cond: BOOLEAN; ch: Channel;
|
|
VAR dest: ARRAY OF BYTE): BOOLEAN;
|
|
(* Evaluates a guard, including reception management *)
|
|
(* VAR aux: ProcessType; *)
|
|
BEGIN
|
|
IF NOT cond THEN
|
|
RETURN FALSE
|
|
ELSIF ch = NIL THEN
|
|
CloseChannels(cp);
|
|
cp^.guardindex := 0;
|
|
RETURN TRUE
|
|
ELSIF Sending(ch) THEN
|
|
Receive(ch, dest);
|
|
cp^.guardindex := 0;
|
|
RETURN TRUE
|
|
ELSE
|
|
OpenChannel(ch, cp^.guardindex);
|
|
RETURN FALSE
|
|
END
|
|
END GUARD;
|
|
|
|
PROCEDURE ENDSELECT(): BOOLEAN;
|
|
(* End of a SELECT structure *)
|
|
BEGIN
|
|
WITH cp^ DO
|
|
IF guardindex <= 0 THEN
|
|
RETURN TRUE
|
|
END;
|
|
guardcount := guardcount - 1;
|
|
IF guardcount # 0 THEN
|
|
guardindex := (guardindex MOD INTEGER(guardno)) + 1
|
|
ELSIF ThereAreOpenChannels() THEN
|
|
DoTransfer
|
|
ELSE
|
|
guardindex := 0
|
|
END
|
|
END;
|
|
RETURN FALSE
|
|
END ENDSELECT;
|
|
|
|
BEGIN
|
|
InitQueue(free);
|
|
InitQueue(ready);
|
|
Allocate(cp,SIZE(ProcessDescriptor));
|
|
WITH cp^ DO
|
|
sons := 0;
|
|
father := NIL
|
|
END
|
|
END CSP.
|
|
|