INTRO
The bpe module contains a subset of BPMN 2.0 API for controlling processes with parallel gateways, hierarchical traces, and scheduling. Also, BPE supports XML BPMN format out of the box and can be used with the Camunda BPMN modeler.
SERVICE
load(procId()) -> #process{}
Loads the process definition and current state from the database (via KVS). Wakes up process definition if not previously initialized in storage.
ProcState = bpe:load(<<"proc-123">>).start(#process{}, list()) -> {ok, procId()} | {error, any()}
Starts the process in the runtime with no process group. Restores/loads process details. Matches against the process ID to ensure a unique runtime gen_server registration.
{ok, ProcId} = bpe:start(ProcState, [{notification, self()}]).start(#process{}, list(), {list(), #procRec{}}) -> {ok, procId()} | {error, any()}
Starts the process under a specific process group monitor (a supervised monitor group). Used to link process execution to roles and monitors.
{ok, ProcId} = bpe:start(ProcState, [], {Monitor, ProcRec}).delete(procId()) -> #process{}
Terminates the process gen_server and removes it from Mnesia active process lists (/bpe/proc) and parent monitor maps, appending its state as "deleted" to the "/bpe/deleted" history feed.
DeletedProc = bpe:delete(<<"proc-123">>).CONTEXT & EXECUTION
proc(procId()) -> #process{}
Retrieves the current process state from the running gen_server. If the process is not online (persisted offline), it will automatically start the process session first.
State = bpe:proc(<<"proc-123">>).update(procId(), #process{}) -> {reply, #process{}, #process{}} | {exit, normal} | {error, any()}
Updates/sets the process state inside the running process gen_server.
bpe:update(<<"proc-123">>, NewState).persist(procId(), #process{}) -> {reply, #process{}, #process{}} | {exit, normal} | {error, any()}
Updates the process state and commits it directly to the database storage.
bpe:persist(<<"proc-123">>, NewState).complete(procId()) -> {complete, any()} | {error, any()}
Invokes BPMN 1.0 process scheduler tick to complete the current task and execute the next targets directly without sequenceFlow evaluation.
{complete, NextTask} = bpe:complete(<<"proc-123">>).complete(procId(), list()) -> {complete, any()} | {error, any()}
Invokes BPMN 1.0 scheduler tick, targeting a specific flow sequence / stage list.
bpe:complete(<<"proc-123">>, <<"TargetTask">>).next(procId()) -> {complete, any()} | {error, any()}
Invokes BPMN 2.0 process scheduler tick evaluating current flows and gating criteria based on sequenceFlow records.
{complete, NextFlow} = bpe:next(<<"proc-123">>).next(procId(), any()) -> {complete, any()} | {error, any()}
Invokes BPMN 2.0 process scheduler tick on a particular flow/stage identifier.
bpe:next(<<"proc-123">>, <<"PaymentFlow">>).amend(procId(), tuple() | list()) -> {complete, any()} | {error, any()}
Appends one or more documents/records to the process context environment and triggers a BPMN 2.0 flow evaluation (calls next/1).
bpe:amend(<<"proc-123">>, #payment{amount = 100}).discard(procId(), tuple() | list()) -> {complete, any()} | {error, any()}
Removes specified documents/records matching the pattern from the process environment, then triggers a BPMN 2.0 flow evaluation.
bpe:discard(<<"proc-123">>, #payment{amount = 100}).modify(procId(), tuple() | list(), append | remove) -> {complete, any()} | {error, any()}
Modifies the process document environment by appending or removing records and triggers the task evaluation without advancing the sequence flow pointer.
bpe:modify(<<"proc-123">>, #payment{amount = 100}, append).messageEvent(procId(), #messageEvent{}) -> {complete, any()} | {error, any()}
Sends a synchronous message event directly to the process runtime. Triggers the action/2 callback.
bpe:messageEvent(
<<"proc-123">>,
#messageEvent{
name = <<"userClick">>,
payload = <<"ok">>
}
).asyncEvent(procId(), #asyncEvent{}) -> ok | {error, any()}
Sends an asynchronous event (cast) to the running process gen_server.
bpe:asyncEvent(
<<"proc-123">>,
#asyncEvent{name = <<"timeout">>}
).HISTORY & INSPECTION
hist(procId()) -> list(#hist{})
Retrieves the full execution history trace records for the specified process.
HistoryList = bpe:hist(<<"proc-123">>).sched(procId()) -> list(#sched{})
Retrieves all scheduler steps, execution pointers, and thread state traces.
SchedList = bpe:sched(<<"proc-123">>).tasks(#process{}) -> list()
Retrieves the list of defined tasks from the process definition record.
flows(#process{}) -> list()
Retrieves the list of sequenceFlows from the process definition record.
events(#process{}) -> list()
Retrieves the list of events from the process definition record.
docs(#process{}) -> list()
Retrieves the list of documents currently inside the process environment.
task(list(), #process{}) -> #task{} | tuple()
Retrieves a specific task configuration record from the process definition by its ID.
doc(tuple(), #process{}) -> list(tuple())
Queries process documents matching a record template structure pattern.
CONDITIONAL FLOW EXECUTION
BPE supports two execution paradigms represented by two distinct scheduler ticks:
-
BPMN 1.0 (complete):
Task-centric execution. Calling
complete/1,2queries the history to find the current active task (vertex) name and resolves sequence flow connections where the current task is the source. It directly fires the task action callback:Module:action({request, Src, Target}, Proc). It does not register or update#sched{}flow tracking pointers, making it suitable for simple direct-line flows. Note: This legacy API is preserved for the PrivatBank customer. -
BPMN 2.0 (next):
Flow-centric execution. Calling
next/1,2tracks execution using thread queues stored in#sched{}records. It inspects active sequenceFlow pointers, verifies authorization rules, executes callbacks, and processes gateways (parallel, exclusive, inclusive) by evaluating flow conditions dynamically. Note: This is the new BPMN 2.0 API developed for the Ministry of Internal Affairs (SE "INFOTECH").
LOCKING MECHANISM
To avoid race conditions and double-execution issues during concurrent gen_server startup and request handling, BPE implements a strict coordination locking protocol:
-
Mnesia Locks (terminateLock): Each client call (such as
proc,next,complete, ormessageEvent) generates a monotonic, positive integer lock ID and stores a#terminateLock{id = Id, pid = ProcId}record in Mnesia via KVS. -
Concurrency & Termination Coordination: When a call completes, BPE executes
bpe_proc:terminate_check/5. It checks the number of pending locks for the process:- If the lock count exceeds the limit (default: 500), BPE terminates the gen_server with an error to prevent deadlocks.
- If the process is returning a
stopstatus but there are other pending client calls registered in Mnesia, BPE bypasses immediate termination and converts the exit command to a normal reply/noreply. This keeps the gen_server alive to drain the remaining queue. - When the last lock is cleared and the process stops, it registers itself in the
terminateLocksETS cache.
-
ETS Cache Blocks: When starting a process session (via
bpe:start/2), it checks the ETS cacheterminateLocksfor the process ID. If a process is in the middle of terminating, it blocks and waits for a'DOWN'monitor message before starting a new gen_server instance.
OFFLINE EVENT RECEPTION
When inactive, BPE process states are kept solely in persistent database storage. BPE guarantees that processes can still receive and process asynchronous messages or subscriptions even when they are offline:
-
Automatic Restoration: Every standard API call on BPE first triggers a
start(load(ProcId), [])call. If the gen_server process is not running, it automatically loads state from the database and spawns a worker process. -
Delayed Event Queueing: If an event or a broadcast message is sent to a persisted process and
the event type is not
immediate, the message is appended to the persistent queue in Mnesia at/bpe/messages/queue/<ProcId>. -
Draining the Queue (Ping Cycle): When the process is spawned or initialized,
init/1schedules a periodic{timer, ping}message. When this info message is received by the process, it reads all stored events from the queue and processes them in order usingprocess_event/3. It also drains any remaining queued items during theterminate/2phase.