Commit 38a48d4b authored by Andrew's avatar Andrew
Browse files

Merge branch 'opacity_enf'

parents c96b17c2 206f599b
......@@ -151,3 +151,14 @@ Examples/
#pycharm stuff
.idea/
# Edisyn tmp directory
tests/edisyn/
edisyn/
# Bosy test directory
tests/bosy/
# temp
temp.prism
temp.props
......@@ -4,4 +4,4 @@ include_trailing_comma = True
force_grid_wrap = 0
use_parentheses = True
line_length = 88
known_third_party = dd,igraph,pydash,tqdm
known_third_party = dd,edisyn,igraph,pydash,tqdm
Andrew Wintenberg <awintenb@umich.edu>
Andrew Wintenberg <awintenb@umich.edu> <awintenb@gmail.com>
Andrew Wintenberg <awintenb@umich.edu> <awintenb@mts.eecs.umich.edu>
Matthew Blischke <matblisc@umich.edu>
Matthew Blischke <matblisc@umich.edu> <matblisc@mts.eecs.umich.edu>
......@@ -166,3 +166,13 @@ class NFA(automata._Automata):
out = [self.Out(pair[1], label)]
out_list[pair[0]] = out
self.vs["out"] = out_list
def trans(self, source, event):
"""
Return the set of target vertex indices reached when the given event occurs from the given source vertex index
"""
targets = set()
for out in self.vs[source]["out"]:
if out.label == event:
targets.add(out.target)
return targets
......@@ -469,12 +469,51 @@ class _Automata:
"""
return self._graph.ecount()
def next_states(self, index):
"""
Return the set of vertex indices that can be reached from the given vertex index in a single event
"""
return {o.target for o in self.vs[index]["out"]}
def active_events(self, index):
"""
Return the set of events that can occur from the given vertex index
"""
return {o.label for o in self.vs[index]["out"]}
def events_between(self, X1, X2):
"""
Return the set of events that transition the system from X1 to X2
X1 and X2 should each be either a single vertex index, or iterable collections of vertex indices
"""
# treat single states as singleton list
if not isinstance(X1, Iterable):
X1 = [X1]
if not isinstance(X2, Iterable):
X2 = [X2]
events = set()
for x1 in X1:
for x2 in X2:
events.update(o.label for o in self.vs[x1]["out"] if o.target == x2)
return events
def trans(self, source, event):
"""
Return the target vertex index reached when the given event occurs from the given source vertex index
"""
for out in self.vs[source]["out"]:
if out.label == event:
return out.target
return None
# TODO: this is legacy, should replace instances of this
# with the UR class method
def unobservable_reach(self, from_state: State_or_StateSet) -> Set[int]:
"""
Finds the set of states in the unobservable reach from the given state.
"""
Finds the set of states in the unobservable reach from the given state.
"""
if isinstance(from_state, set):
states = set()
......
......@@ -114,13 +114,23 @@ def product_linear(*automata: Automata_t) -> Automata_t:
if len(automata) < 2:
raise MissingAttributeError("More than one automaton are needed.")
# add init attribute to all automata so that both DFAs and NFAs will work
for g in automata:
if "init" not in g.vs.attributes():
g.vs["init"] = False
g.vs[0]["init"] = True
G1 = automata[0]
input_list = automata[1:]
for G2 in tqdm(
input_list, desc="Product Composition", disable=SHOW_PROGRESS is False
):
G_out = _Automata()
# result is nondeterministic if any input is nondeterministic
if any(isinstance(g, NFA) for g in automata):
G_out = NFA()
else:
G_out = DFA()
num_G2_states = len(G2.vs)
G_out_vertices = [
......@@ -128,6 +138,7 @@ def product_linear(*automata: Automata_t) -> Automata_t:
"index": i * num_G2_states + j,
"name": (x1["name"], x2["name"]),
"marked": x1["marked"] is True and x2["marked"] is True,
"init": x1["init"] and x2["init"],
"indexes": (x1.index, x2.index),
}
for i, x1 in enumerate(G1.vs)
......@@ -138,6 +149,7 @@ def product_linear(*automata: Automata_t) -> Automata_t:
names=[v["name"] for v in G_out_vertices],
marked=[v["marked"] for v in G_out_vertices],
indexes=[v["indexes"] for v in G_out_vertices],
init=[v["init"] for v in G_out_vertices],
)
G1_vertices = [
{
......@@ -570,6 +582,8 @@ def observer(G: Automata_t) -> Automata_t:
observer.Euc.update(G.Euc - G.Euo)
observer.Euo.clear()
observer.vs["marked"] = marked_list
observer.vs[0]["init"] = True
observer.add_edges(
transition_list, transition_label, check_DFA=False, fill_out=False
)
......
......@@ -10,7 +10,7 @@ from collections import OrderedDict
from DESops.automata.NFA import NFA
def product_NFA(g_list, save_state_names=True, save_marked_states=True):
def product_NFA(g_list, save_state_names=True, save_marked_states=True, priorityOrder=False):
"""
Computes the product composition of 2 (or more) Automata, and returns
the resulting composition as an automata.
......
"""
Operations involving transducer automata with events represented by pairs (e_in, e_out)
"""
from DESops.automata import NFA
def generic_product(g, h, state_attr_list, state_attr_map, edge_label_map,
allow_g_None_trans=True, allow_h_None_trans=True):
"""
Compute a generic product of two automata with provided semantics.
The product is computed by iterating over pairs of states of g and h that have been reached,
initialized with the initial states of g and h.
The attributes of the pair of states are computed by applying the provided map state_attr_map
to the attributes in the attr_list of the states in the pair.
Given an outgoing transition from each state in the current pair, a transition is added
to the product system if it is allowed by the provided edge_label_map,
the resulting label is also computed.
Transitions in the product system consisting of a transition in one system while the other
system does not transition are enabled with the variables allow_g_None_trans and allow_h_None_trans
Transitions where neither system moves are never allowed
Parameters:
g: The first automaton for the product
h: The second automaton for the product
state_attr_list: The list of state attributes in the product
state_attr_map: The map from the attributes of a state of g and a state of h to the attributes in the product state
edge_label_map: The map from the pairs of labels of a transitions of g and of h to whether the product transition
should be allowed and what the corresponding product label is
allow_g_None_trans: whether or not to allow a transition where g does not move but h does
allow_h_None_trans: whether or not to allow a transition where h does not move but g does
Returns: the product of g and h
"""
state_list = {}
state_attr = {attr: [] for attr in state_attr_list}
edge_list = []
edge_labels = []
next_states_to_check = []
def add_state_pair(new_pair):
if new_pair not in state_list:
new_index = len(state_list)
state_list[new_pair] = new_index
new_attr = state_attr_map(g.vs[new_pair[0]], h.vs[new_pair[1]])
for attr in state_attr_list:
state_attr[attr].append(new_attr[attr])
next_states_to_check.append(new_pair)
return new_index
else:
return state_list[new_pair]
def add_pair_edge(source_pair, dest_pair, g_label, h_label):
add_edge, edge_label = edge_label_map(g_label, h_label)
if add_edge:
cur_ind = state_list[source_pair]
new_ind = add_state_pair(dest_pair)
edge_list.append((cur_ind, new_ind))
edge_labels.append(edge_label)
for v1 in g.vs.select(init=True):
for v2 in h.vs.select(init=True):
add_state_pair((v1.index, v2.index))
# set next_states_to_check returns False when empty
while next_states_to_check:
vert_pair = next_states_to_check.pop()
# Iterate through all new synchronized states found in last iteration, checking neighbors
# select edges with source at current vertex
g_es = g.es(_source=vert_pair[0])
h_es = h.es(_source=vert_pair[1])
for eg in g_es:
for eh in h_es:
dest_pair = (eg.target, eh.target)
add_pair_edge(vert_pair, dest_pair, eg['label'], eh['label'])
if allow_h_None_trans:
dest_pair = (eg.target, vert_pair[1])
add_pair_edge(vert_pair, dest_pair, eg['label'], None)
if allow_g_None_trans:
for eh in h_es:
dest_pair = (vert_pair[0], eh.target)
add_pair_edge(vert_pair, dest_pair, None, eh['label'])
"""
# Transitions where neither system move are never allowed
if allow_h_None_trans:
add_pair_edge(vert_pair, vert_pair, None, None)
"""
g_comp = NFA()
g_comp.add_vertices(len(state_list), names=list(state_list.keys()), **state_attr)
g_comp.add_edges(edge_list, edge_labels)
g_comp.generate_out()
return g_comp
def transducer_auto_product(t, g, t_attr_list=None, g_attr_list=None,
allow_g_None_trans=True, allow_h_None_trans=True):
"""
Compute an automaton where the labels of g are replaced according to the transducer t
"""
if not t_attr_list:
t_attr_list = set()
if not g_attr_list:
g_attr_list = set()
state_attr_list = {'init', 'marked'} | t_attr_list | g_attr_list
def state_attr_map(gs, hs):
return {'marked': gs['marked'] and hs['marked'],
'init': gs['init'] and hs['init']} | \
{attr: gs[attr] for attr in t_attr_list} | \
{attr: hs[attr] for attr in g_attr_list}
def edge_label_map(ge, he):
if not ge:
return he == '', ''
elif not he:
return ge[0] == '', ge[1]
else:
return ge[0] == he, ge[1]
return generic_product(t, g, state_attr_list, state_attr_map, edge_label_map,
allow_g_None_trans, allow_h_None_trans)
def transducer_transducer_product(t, g, t_attr_list=None, g_attr_list=None,
allow_g_None_trans=True, allow_h_None_trans=True):
"""
Compute a transducer where the inputs of t are mapped to corresponding outputs in g
"""
if not t_attr_list:
t_attr_list = set()
if not g_attr_list:
g_attr_list = set()
state_attr_list = {'init', 'marked'} | t_attr_list | g_attr_list
def state_attr_map(gs, hs):
return {'marked': gs['marked'] and hs['marked'],
'init': gs['init'] and hs['init']} | \
{attr: gs[attr] for attr in t_attr_list} | \
{attr: hs[attr] for attr in g_attr_list}
def edge_label_map(ge, he):
if not ge:
return he[0] == '' and he[1] == '', ('', '')
elif not he:
return ge[0] == '', ('', ge[1])
else:
return ge[0] == he[1], (he[0], ge[1])
return generic_product(t, g, state_attr_list, state_attr_map, edge_label_map,
allow_g_None_trans, allow_h_None_trans)
def auto_auto_product(g, h, g_attr_list=None, h_attr_list=None,
allow_g_None_trans=True, allow_h_None_trans=True):
"""
Compute the usual synchronous product of automata
"""
if not h_attr_list:
h_attr_list = set()
if not g_attr_list:
g_attr_list = set()
state_attr_list = {'init', 'marked'} | h_attr_list | g_attr_list
def state_attr_map(gs, hs):
return {'marked': gs['marked'] and hs['marked'],
'init': gs['init'] and hs['init']} | \
{attr: gs[attr] for attr in g_attr_list} | \
{attr: hs[attr] for attr in h_attr_list}
def edge_label_map(ge, he):
if not ge:
return he == '', ''
elif not he:
return ge == '', ''
else:
return ge == he, ge
return generic_product(g, h, state_attr_list, state_attr_map, edge_label_map,
allow_g_None_trans, allow_h_None_trans)
def auto_auto_parallel_comp(g, h, g_attr_list=None, h_attr_list=None,
allow_g_None_trans=True, allow_h_None_trans=True):
"""
Compute the usual parallel composition or asynchronous product of automata
"""
common_events = g.events & h.events
if '' in common_events:
common_events.remove('')
if not h_attr_list:
h_attr_list = set()
if not g_attr_list:
g_attr_list = set()
state_attr_list = {'init', 'marked'} | h_attr_list | g_attr_list
def state_attr_map(gs, hs):
return {'marked': gs['marked'] and hs['marked'],
'init': gs['init'] and hs['init']} | \
{attr: gs[attr] for attr in g_attr_list} | \
{attr: hs[attr] for attr in h_attr_list}
def edge_label_map(ge, he):
if not ge:
return he not in common_events, he
elif not he:
return ge not in common_events, ge
else:
return ge == he, ge
return generic_product(g, h, state_attr_list, state_attr_map, edge_label_map,
allow_g_None_trans, allow_h_None_trans)
def transducer_input_automaton(t):
"""
Construct an automaton representing the input language of the transducer t
"""
g = t.copy()
g.events = set()
g.es['label'] = [e[0] for e in g.es['label']]
g.generate_out()
return g
def transducer_output_automaton(t):
"""
Construct an automaton representing the output language of the transducer t
"""
g = t.copy()
g.events = set()
g.es['label'] = [e[1] for e in g.es['label']]
g.generate_out()
return g
......@@ -30,11 +30,17 @@ def find_inacc(G: _Automata, states_removed=set()) -> set:
if G.vcount() == 0:
# warnings.warn("Ac(): the given automaton is empty.")
return set()
if 0 in states_removed:
init_set = None
try:
init_set = set([v.index for v in G.vs.select(init=True)])
except KeyError:
init_set = {0}
if init_set.issubset(states_removed):
# warnings.warn("Initial state deleted.")
return set([v.index for v in G.vs])
good_states = {0}
good_states = init_set
stack = deque(good_states)
while len(stack) > 0:
index = stack.popleft()
......
......@@ -9,9 +9,11 @@ def construct_bisimulation(g):
Constructs and returns a minimal-state automaton that is bisimilar to g and that preserves the secret behavior of g
"""
events = set(g.es["label"])
if "secret" not in g.vs.attributes():
g.vs["secret"] = False
partition, table, e_dict = find_coarsest_bisimilar_partition(g, events)
h = NFA()
h = g.__class__()
h.add_vertices(len(partition))
h.vs["init"] = [any([g.vs[i]["init"] for i in block]) for block in partition]
h.vs["secret"] = [any([g.vs[i]["secret"] for i in block]) for block in partition]
......@@ -22,7 +24,7 @@ def construct_bisimulation(g):
state = block.pop()
for e in events:
# add transitions from the current block to every other block that event e leads to
next_set = table[state][0][e_dict[e]]
next_set = table[state][e_dict[e]]
for j in next_set:
h.add_edge(i, j, e)
......@@ -46,8 +48,8 @@ def find_coarsest_bisimilar_partition(g, events):
# initial partition separates secret and nonsecret states
partition = [
{v.index for v in g.vs if v["secret"]},
{v.index for v in g.vs if not v["secret"]},
{v.index for v in g.vs if v["secret"]},
]
if set() in partition:
partition.remove(set())
......@@ -70,7 +72,7 @@ def find_coarsest_bisimilar_partition(g, events):
for j, value in enumerate(row):
row[j] = frozenset(value)
# add a secrecy flag to prevent secret and nonsecret states in the same block
table[i] = (tuple(row), g.vs[i]["secret"])
table[i] = (*row, g.vs[i]["secret"])
# row_dict maps rows of the table to sets of states that produced that row
row_dict = dict()
......@@ -89,3 +91,99 @@ def find_coarsest_bisimilar_partition(g, events):
# otherwise repeat the process using the new partition
else:
partition = new_partition
def construct_simulation(g):
"""
Constructs and returns a minimal-state automaton that is similar to g and that preserves the secret behavior of g
"""
events = set(g.es["label"])
partition, table, e_dict = find_coarsest_similar_partition(g, events)
h = NFA()
h.add_vertices(len(partition))
h.vs["init"] = [any([g.vs[i]["init"] for i in block]) for block in partition]
h.vs["secret"] = [any([g.vs[i]["secret"] for i in block]) for block in partition]
h.Euo = g.Euo
for i, block in enumerate(partition):
for e in events:
# add transitions from the current block to every other block that event e leads to
next_set = set.union(*[table[state][e_dict[e]] for state in block])
for j in next_set:
h.add_edge(i, j, e)
h.generate_out()
return h
def find_coarsest_similar_partition(g, events):
"""
Finds the coarsest partition of states of g that will produce an automaton that
is similar to g and whose secret behavior is identical to that of g
Returns:
the partition as a list of sets of state indices,
a table whose (i,j) entry is the set of block indices reached by block i via event j,
a dict mapping events to their column in the table
"""
e_dict = dict()
for i, e in enumerate(events):
e_dict[e] = i
# initial partition separates secret and nonsecret states
partition = [
{v.index for v in g.vs if not v["secret"]},
{v.index for v in g.vs if v["secret"]},
]
if set() in partition:
partition.remove(set())
# loop until we return in the final if statement
while True:
# s_dict maps each state index to the block containing it
s_dict = dict()
for i, block in enumerate(partition):
for state in block:
s_dict[state] = i
# table tells which blocks each state transitions to with each event
table = [[set() for _ in events] for _ in g.vs]
for t in g.es:
table[t.source][e_dict[t["label"]]].add(s_dict[t.target])
# divide partitions so that for each event, each state in a block transitions to the same block or not at all
new_partition = list()
for part in partition:
targets = [set() for _ in events]
new_part1 = set()
new_part2 = set()
for state in part:
# check whether current state can be joined with the new part we have so far
good = True
for i in range(len(events)):
old = targets[i]
new = table[state][i]
if old and new and old != new:
good = False
break
# separate good states and bad states
if good:
new_part1.add(state)
# update the non-empty targets of the "good" partition
targets = [
targets[i].union(table[state][i]) for i in range(len(events))
]
else:
new_part2.add(state)
# add new parts to new partition
new_partition.append(new_part1)
if new_part2:
new_partition.append(new_part2)
# if partition size is unchanged, then we have found the coarsest bisimilar partition
if len(new_partition) == len(partition):
return partition, table, e_dict
# otherwise repeat the process using the new partition
else:
partition = new_partition
The files in this folder contain functions related to enforcing opacity using the reactive synthesis tool BoSy (https://github.com/reactive-systems/bosy)
The main function to perform run the full synthesis process is run_bosy() in bosy_interface.py
Some modifications must be made to the base BoSy installation so that it uses a Buchi automaton we construct, instead of converting an LTL specification to a Buchi automaton.
### BoSy Installation
In an directory external to M-DESops, clone the correct commit of BoSy with
$ git clone https://github.com/reactive-systems/bosy.git
$ git checkout 2bad0d853e1b023aa27a6a0d8f8a129b5bd96ed7
Copy the file `bosy_patch.patch` in this directory of M-DESops into the newly created `bosy/` directory.
In the `bosy/` directory, apply the patch with the command
$ git apply bosy_patch.patch
Next, BoSy can be installed as usual. See instructions at https://github.com/reactive-systems/bosy. If the necessary dependencies (i.e. swift) are installed, this can typically be done with the command `make`.
import getopt
import os.path
import sys
from collections.abc import Iterable
from pathlib import Path
from DESops.opacity.bosy.buchi import add_safe_state, compose_buchi, construct_buchi_cso
from DESops.opacity.bosy.edit_to_bosy import get_bool_vars, write_bosy_hyper
from DESops.opacity.bosy.parse_smv import (
make_human_readable,
read_smv,
write_partial_smv,
)
from DESops.opacity.bosy.spin import ltl2spin, read_spin, write_spin
# This program relies on BoSy and Aiger.
# Note BoSy relies on swift. Ensure this is in your path
# Ensure bosy is properly installed and placed in a directory with write/execute permissions
# (I had to move it to my home directory)
# Change the path to Bosy here appropriately
bosy_path = "/".join([str(Path.home()), "libraries/bosy"])
# Also ensure that Aiger is installed at the following path
aiger_path