Commit aca763de authored by Andrew's avatar Andrew
Browse files

Added beginning of tree automata code. Synthesis is still buggy!

parent c833821e
This diff is collapsed.
import itertools
from collections import namedtuple
class Pipeline:
def __init__(self, in_type):
self.in_type = in_type
self.out_type = DataType({})
self.master_type = in_type
self.process_list = []
def get_type(self, names):
return self.master_type.subtype(names)
def append_process(self, process):
self.master_type = self.master_type.product_type(process.out_type)
self.out_type = self.out_type.product_type(process.out_type)
def check_valid(self):
if not self.process_list:
return True
if not self.in_type.is_subtype(self.process_list[0]):
return False
for p1, p2 in zip(self.process_list[:-1], self.process_list[1:]):
if not p1.out_type.is_subtype(p2.in_type):
return False
for i, p1 in enumerate(self.process_list):
for p2 in self.process_list[i+1:]:
if not p1.out_type.is_disjoint(p2.out_type):
return False
if not p1.out_type.is_disjoint(self.in_type):
return False
return True
class Process:
def __init__(self, in_type, out_type):
self.in_type = in_type
self.out_type = out_type
# TODO: var names are unordered. Need to add check for equality which accounts for this
class DataType:
def __init__(self, var_value_dict):
# self._var_value_dict = var_value_dict
self._var_value_dict = {k: frozenset(v) for k, v in var_value_dict.items()}
self.TupleClass = namedtuple('datatype', var_value_dict.keys())
def __call__(self, **kwargs):
return self.TupleClass(**kwargs)
def __iter__(self):
for val in itertools.product(*[self.var_values(name) for name in self.var_names()]):
yield self.TupleClass._make(val)
def __eq__(self, other):
return self._var_value_dict == other._var_value_dict
def __hash__(self):
# TODO make sure this is a valid hash
return hash(frozenset(self._var_value_dict.items()))
def var_names(self):
return self._var_value_dict.keys()
def var_values(self, name):
return self._var_value_dict[name]
def subtype(self, names):
return DataType({name: self._var_value_dict[name] for name in names})
def is_subtype(self, other):
for name in self.var_names():
other_vals = other._var_value_dict.get(name)
if other_vals is None or not self._var_value_dict[name].issubset(other_vals):
return False
return True
def is_supertype(self, other):
return other.is_subtype(self)
def is_disjoint(self, other):
return self.var_names().isdisjoint(other.var_names())
def is_type_of(self, val):
if not val.isinstance(self.TupleClass):
return False
for k, v in val._asdict().items():
if v not in self.var_values(k):
return False
return True
def projection(self, val):
return self.TupleClass(**{k: v for k, v in val._asdict().items() if k in self.var_names()})
def inverse_projection(self, val):
val_dict = val._asdict()
new_names = list(self.var_names() - val_dict.keys())
other_values = itertools.product(*[self.var_values(name) for name in new_names])
return {self.TupleClass(**(val_dict | dict(zip(new_names, o_val)))) for o_val in other_values}
def inverse_projection_gen(self, val):
val_dict = val._asdict()
new_names = list(self.var_names() - val_dict.keys())
for o_val in itertools.product(*[self.var_values(name) for name in new_names]):
yield self.TupleClass(**(val_dict | dict(zip(new_names, o_val))))
def possible_values(self):
values = itertools.product(*[self.var_values(name) for name in self.var_names()])
return {self.TupleClass._make(val) for val in values}
def from_subvalues(self, *subvalues):
return self(**{k: v for sv in subvalues for k, v in sv._asdict().items()})
def product_type(self, *others):
var_value_dict = {}
var_value_dict |= self._var_value_dict
for other in others:
var_value_dict |= other._var_value_dict
return DataType(var_value_dict)
def quotient_type(self, subtype):
if not self.is_supertype(subtype):
raise ValueError("Can only quotient type by a subtype")
return self.subtype(self.var_names() - subtype.var_names())
def stringify(self):
return DataType({str(key): {str(v) for v in vals}
for key, vals in self._var_value_dict.items()})
def is_empty_type(self):
return len(self.var_names()) == 0
empty_type = DataType({})
empty_value = empty_type()
import dist_process
def pipeline_synthesis(pipe, spec_ata, delay=False):
Perform synthesis on a pipeline architecture for the given specification automaton
The automaton should have input type given by the non-environmental (input) variables of the pipeline
and direction type given by the environmental (input) variables of the pipeline
A = []
N = []
B = [spec_ata]
Iteratively reduce the specification automaton by "nondeterministically" implementing
the processes of the pipeline in order.
for i, p in enumerate(pipe.process_list):
print(f"Reducing process {i}")
# Narrow the directions of the specification to the input of the current process
# Make the result nondeterministic for change operation / nonemptiness check
# No need to compute change operation for last process
if len(B) == len(pipe.process_list):
# Nondeterministically implement the current process
# Change the direction type of the specification to the output of the current process
B.append(N[-1].change_pipeline(p.out_type, delay=delay).copy_int())
# check emptiness for last automaton and find an implementation if one exists
print("Checking emptiness")
empty, member_ata = N[-1].is_empty()
return empty
if empty:
return {"realizable": not empty, "solution": None}
# iterate and find implementation for the previous processes
ata_list = [member_ata]
lifted_ata_list = [member_ata]
for i, p in reversed(list(enumerate(pipe.process_list[:-1]))):
last_lifted_ata = lifted_ata_list[-1]
# compose specification for processes with computed implementations of processes
new_lifted_ata = inverse_change(inverse_narrow(
last_lifted_ata, p.out_type), p.in_type).AND(N[i])
# compute a corresponding implementation for the processes
_, new_lifted_ata = new_lifted_ata.is_empty()
# restrict implementation to current process
new_ata = new_lifted_ata.inverse_widen(p.out_type)
if empty:
return {"realizable": not empty, "solution": ata_list}
def linear_to_tree_specification(g, dir_type, mode="mealy"):
if mode == "mealy":
delay = False
elif mode == "moore":
delay = True
raise ValueError("Mode must be mealy/moore")
return g.change_pipeline(dir_type, mode='AND', delay=delay)
def linear_to_tree_specification_pipeline(g, pipe):
This method takes the word automaton g and appropriately shifts variables according to the pipeline arch
so that strategies composed with delay have traces accepted by the shifted version correspond
to strategies composed without delay have traces accepted by g.
This method then takes this shifted word automaton and converts it into a tree automaton
with directions given by the input to the architecture
TODO - for efficiency, shifting should be done inbetween each step of synthesis rather than all at the beginning
empty_type = dist_process.empty_type
augmented_output_types = [empty_type.product_type(*[proc.out_type for proc in pipe.process_list[i:]])
for i in range(len(pipe.process_list))]
# TODO this is buggy
for t1, t2 in zip(augmented_output_types[:-1], augmented_output_types[1:]):
g = g.delay_in(t1)
g = g.delay_in(t2)
for t in augmented_output_types:
g = g.delay_in(t)
return g.change_pipeline(pipe.in_type, mode='AND', delay=False)
import DESops.tree.ATA as ATA
import DESops.tree.ATA as ASA
import DESops.tree.dist_process as dist_process
import DESops.tree.dist_synthesis as dist_synthesis
import DESops
from DESops import NFA
from itertools import chain
empty_type = dist_process.DataType({})
empty_val = empty_type()
def inferability(pipe):
in_type = pipe.get_type(["infer", "info"])
info_values = pipe.in_type.var_values("info")
ata_inf = ATA.AWA(in_type)
# Label of inference should be the same as the label of the info
ata_inf.init_state = "q"
ata_inf.add_transitions({("q", in_type(info=info, infer=info)):
ata_inf.alg.Symbol(("q", empty_val))
for info in info_values})
return ata_inf
def secrecy(pipe, g_ns):
in_type = pipe.get_type(["Eo"])
ata_sec = ATA.AWA(in_type)
# event output should be constrained by nonsecret automaton
ata_sec.init_state =[0].index
ata_sec.add_transitions({(e.source, in_type(Eo=e["label"])):
ata_sec.alg.Symbol((, empty_val))
for e in})
return ata_sec
def block_assumption(pipe):
block_type = pipe.get_type(["block"])
in_type = block_type.product_type(pipe.in_type)
ata_block = ATA.AWA(in_type)
# Input must hold its values constant when output is not blocking
# states represents last input
ata_block.add_states({d for d in pipe.in_type})
ata_block.init_state = "block_init"
ata_block.add_transitions({(q, in_type.from_subvalues(q, block_type(block=1))):
ata_block.alg.Symbol((q, empty_val))
for q in pipe.in_type})
ata_block.add_transitions({(ata_block.init_state, in_type.from_subvalues(q, block_type(block=0))):
ata_block.alg.Symbol((qp, empty_val))
for q in pipe.in_type for qp in pipe.in_type})
ata_block.add_transitions({(ata_block.init_state, in_type.from_subvalues(qp, block_type(block=1))):
ata_block.alg.Symbol((qp, empty_val))
for qp in pipe.in_type})
return ata_block
def block_guarantee(pipe, k_bound=0):
in_type = pipe.get_type(["block"])
ata_block = ATA.AWA(in_type)
if k_bound is None:
ata_block.init_state = 0
both = ata_block.alg.AND(ata_block.alg.Symbol((0, empty_val)),
ata_block.alg.Symbol((0, empty_val)))
ata_block.add_transitions({(0, in_type(block=0)): both,
(0, in_type(block=1)): both,
(1, in_type(block=0)):
(1, in_type(block=1)):
ata_block.alg.Symbol((1, empty_val))})
# output must not block more than k times in a row
ata_block.add_states(set(range(k_bound + 1)))
ata_block.init_state = 0
ata_block.add_transitions({(q, in_type(block=0)):
ata_block.alg.Symbol((0, empty_val))
for q in ata_block.states})
ata_block.add_transitions({(q, in_type(block=1)):
ata_block.alg.Symbol((q+1, empty_val))
for q in range(0, k_bound)})
return ata_block
def plant_dyn(pipe, g_plant):
Encode plant behavior from NFA into AFA with blocking
block_type = pipe.get_type(["block"])
in_type = block_type.product_type(pipe.in_type)
ata_dyn = ATA.AWA(in_type)
# input to architecture must agree with the plant, accounting for block
ata_dyn.add_states({v for v in g_plant.vs.indices})
ata_dyn.init_state =[0].index
for v in ata_dyn.states:
for e in
ata_dyn.add_transitions({(v, in_type(block=0, Ei=e["label"], info=g_plant.vs["info"][])):
ata_dyn.alg.Symbol((, empty_val))
ata_dyn.add_transitions({(v, in_type.from_subvalues(block_type(block=1), in_value)):
ata_dyn.alg.Symbol((v, empty_val))
for in_value in pipe.in_type})
return ata_dyn
def create_dist_problem(g, info_attr="info"):
master_type = dist_process.DataType({
"block": {0, 1},
"info": set(g.vs[info_attr]),
"infer": set(g.vs[info_attr])
pipe = dist_process.Pipeline(master_type.subtype({"info", "Ei"}))
edit_process = dist_process.Process(
master_type.subtype({"block", "Eo"})
infer_process = dist_process.Process(
g_ns = g.copy()
# TODO remove simplify if possible?
A_inf = inferability(pipe).simplify()
A_sec = secrecy(pipe, g_ns).simplify()
A_block_assumption = block_assumption(pipe).complement().simplify()
A_block_guarantee = block_guarantee(pipe, k_bound=0).simplify()
A_plant_dyn = plant_dyn(pipe, g).complement().simplify()
A_spec = A_block_guarantee.AND((A_inf.AND(A_sec)).OR(A_block_assumption, A_plant_dyn)).simplify()
A_spec = A_spec.widen_output(master_type)
#A_spec = dist_synthesis.linear_to_tree_specification(A_spec, pipe.in_type, mode="mealy")
A_spec = dist_synthesis.linear_to_tree_specification_pipeline(A_spec, pipe)
return pipe, A_spec
if __name__ == '__main__':
g = NFA()
g.vs['init'] = [True, False]
g.vs['secret'] = [False, True]
g.vs['info'] = [False, True]
g.add_edges([(0, 1), (1, 0), (0, 0), (0, 0)], ['a', 'a', 'b', 'c'])
print("Constructing problem")
pipe, spec = create_dist_problem(g)
realizable = not dist_synthesis.pipeline_synthesis(pipe, spec, delay=True)
from DESops.tree import ATA, dist_process, dist_synthesis
from DESops.automata import DFA
empty_type = dist_process.DataType({})
empty_val = empty_type()
def match_spec():
in_type = dist_process.DataType({'p_out': {1,2}})
dir_type = dist_process.DataType({'p_in': {1,2}})
ata_inf = ATA.ATA(in_type, dir_type)
ata_inf.init_state = "q"
ata_inf.add_transitions({("q", in_type(p_out=info)):
ata_inf.alg.Symbol(("q", dir_type(p_in=info)))
for info in {1,2}})
return ata_inf
def lin_match_spec():
in_type = dist_process.DataType({'y':{0,1}, 'x':{0,1}, 'p_out': {0,1}, 'p_in': {0,1}})
ata_inf = ATA.AWA(in_type.subtype(['p_in', 'p_out']))
ata_inf.init_state = 0
ata_inf.add_transitions({(0, ata_inf.in_type(p_out=info, p_in=info)):
ata_inf.alg.Symbol((0, dist_process.empty_value))
for info in {0,1}})
ata_o = ATA.AWA(in_type.subtype(['x', 'p_in']))
ata_o.init_state = 0
ata_o.add_transitions({(0, ata_o.in_type(p_in=info, x=1-info)):
ata_o.alg.Symbol((0, dist_process.empty_value))
for info in {0, 1}})
ata_inf = ata_inf.OR(ata_o).widen_output(in_type)
return ata_inf
def lin_match_spec_2():
in_type = dist_process.DataType({'x':{0,1}, 'p_out': {0,1}, 'p_in': {0,1}})
ata_inf = ATA.AWA(in_type.subtype(['p_in', 'p_out']))
ata_inf.init_state = 0
ata_inf.add_transitions({(0, ata_inf.in_type(p_out=info, p_in=info)):
ata_inf.alg.Symbol((0, dist_process.empty_value))
for info in {0,1}})
return ata_inf.widen_output(in_type)
def lin_spec():
in_type = dist_process.DataType({'p_out': {0, 1}, 'p_in': {0, 1}})
ata_inf = ATA.AWA(in_type)
ata_inf.add_states({0, 1, 2})
ata_inf.init_state = 0
ata_inf.add_transitions({(0, ata_inf.in_type(p_out=0, p_in=1)):
ata_inf.alg.Symbol((1, dist_process.empty_value))})
ata_inf.add_transitions({(0, ata_inf.in_type(p_out=0, p_in=0)):
ata_inf.alg.Symbol((2, dist_process.empty_value))})
ata_inf.add_transitions({(1, ata_inf.in_type(p_out=1, p_in=0)):
ata_inf.alg.Symbol((1, dist_process.empty_value))})
ata_inf.add_transitions({(2, ata_inf.in_type(p_out=0, p_in=1)):
ata_inf.alg.Symbol((2, dist_process.empty_value))})
ata_inf.add_transitions({(1, ata_inf.in_type(p_out=1, p_in=1)):
ata_inf.alg.Symbol((1, dist_process.empty_value))})
ata_inf.add_transitions({(2, ata_inf.in_type(p_out=0, p_in=0)):
ata_inf.alg.Symbol((2, dist_process.empty_value))})
return ata_inf
A = match_spec()
#A = A.to_nondet()
empty, tree = A.is_empty()
A = lin_match_spec_2()
#A_tree = dist_synthesis.linear_to_tree_specification(A, A.in_type.subtype(['p_in']), mode="mealy")
pipe = dist_process.Pipeline(A.in_type.subtype({"p_in"}))
p1 = dist_process.Process(
p2 = dist_process.Process(
A_spec = dist_synthesis.linear_to_tree_specification_pipeline(A, pipe)
res = dist_synthesis.pipeline_synthesis(pipe, A_spec, delay=True)
B = A_tree.delay_in(p2.out_type)
C = B.delay_in(A_tree.in_type)
D = B.change_pipeline(p1.out_type, delay=True)
#B = A_tree.change_pipeline(p1.out_type, delay=False)
res = dist_synthesis.pipeline_synthesis(pipe, A_tree, delay=False)
#B = A.delay_in(A.in_type.quotient_type(pipe.in_type))
C = B.delay_in(A.in_type.quotient_type(pipe.in_type))
D = C.delay_in(p2.out_type)
#E = D.change_pipeline(pipe.in_type, mode='AND', delay=True)
#F = E.to_nondet().change_pipeline(p1.out_type, mode='OR', delay=True).copy_int()
A_tree = dist_synthesis.linear_to_tree_specification(D, pipe.in_type)
res = dist_synthesis.pipeline_synthesis(pipe, A_tree, delay=True)
A = lin_spec()
A = A.to_tree_automaton(A.in_type.subtype(['p_in']))
#A = A.delay_input_mod()
A = A.to_nondet()
#A = A.change_pipeline(A.in_type)
empty, B = A.is_empty()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment