|
|
@ -10,20 +10,35 @@ Circuit graphs also define an ordering of inputs, outputs and other nodes to eas |
|
|
|
|
|
|
|
|
|
|
|
""" |
|
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
|
|
|
|
|
|
from collections import deque, defaultdict |
|
|
|
from collections import deque, defaultdict |
|
|
|
import re |
|
|
|
import re |
|
|
|
|
|
|
|
from typing import Union |
|
|
|
|
|
|
|
|
|
|
|
import numpy as np |
|
|
|
import numpy as np |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class GrowingList(list): |
|
|
|
class GrowingList(list): |
|
|
|
def __setitem__(self, index, value): |
|
|
|
def __setitem__(self, index, value): |
|
|
|
if index >= len(self): |
|
|
|
if value is None: self.has_nones = True |
|
|
|
self.extend([None] * (index + 1 - len(self))) |
|
|
|
if index == len(self): return super().append(value) |
|
|
|
|
|
|
|
if index > len(self): |
|
|
|
|
|
|
|
super().extend([None] * (index + 1 - len(self))) |
|
|
|
|
|
|
|
self.has_nones = True |
|
|
|
super().__setitem__(index, value) |
|
|
|
super().__setitem__(index, value) |
|
|
|
|
|
|
|
|
|
|
|
def free_index(self): |
|
|
|
def __getitem__(self, index): |
|
|
|
return next((i for i, x in enumerate(self) if x is None), len(self)) |
|
|
|
if isinstance(index, slice): return super().__getitem__(index) |
|
|
|
|
|
|
|
return super().__getitem__(index) if index < len(self) else None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@property |
|
|
|
|
|
|
|
def free_idx(self): |
|
|
|
|
|
|
|
fi = len(self) |
|
|
|
|
|
|
|
if hasattr(self, 'has_nones') and self.has_nones: |
|
|
|
|
|
|
|
fi = next((i for i, x in enumerate(self) if x is None), len(self)) |
|
|
|
|
|
|
|
self.has_nones = fi < len(self) |
|
|
|
|
|
|
|
return fi |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class IndexList(list): |
|
|
|
class IndexList(list): |
|
|
@ -76,10 +91,10 @@ class Node: |
|
|
|
by allocating an array or list :code:`my_data` of length :code:`len(n.circuit.nodes)` and |
|
|
|
by allocating an array or list :code:`my_data` of length :code:`len(n.circuit.nodes)` and |
|
|
|
accessing it by :code:`my_data[n.index]` or simply by :code:`my_data[n]`. |
|
|
|
accessing it by :code:`my_data[n.index]` or simply by :code:`my_data[n]`. |
|
|
|
""" |
|
|
|
""" |
|
|
|
self.ins = GrowingList() |
|
|
|
self.ins: list[Line] = GrowingList() |
|
|
|
"""A list of input connections (:class:`Line` objects). |
|
|
|
"""A list of input connections (:class:`Line` objects). |
|
|
|
""" |
|
|
|
""" |
|
|
|
self.outs = GrowingList() |
|
|
|
self.outs: list[Line] = GrowingList() |
|
|
|
"""A list of output connections (:class:`Line` objects). |
|
|
|
"""A list of output connections (:class:`Line` objects). |
|
|
|
""" |
|
|
|
""" |
|
|
|
|
|
|
|
|
|
|
@ -135,7 +150,7 @@ class Line: |
|
|
|
Use the explicit case only if connections to specific pins are required. |
|
|
|
Use the explicit case only if connections to specific pins are required. |
|
|
|
It may overwrite any previous line references in the connection list of the nodes. |
|
|
|
It may overwrite any previous line references in the connection list of the nodes. |
|
|
|
""" |
|
|
|
""" |
|
|
|
def __init__(self, circuit, driver, reader): |
|
|
|
def __init__(self, circuit: Circuit, driver: Union[Node, tuple[Node, int]], reader: Union[Node, tuple[Node, int]]): |
|
|
|
self.circuit = circuit |
|
|
|
self.circuit = circuit |
|
|
|
"""The :class:`Circuit` object the line is part of. |
|
|
|
"""The :class:`Circuit` object the line is part of. |
|
|
|
""" |
|
|
|
""" |
|
|
@ -147,7 +162,7 @@ class Line: |
|
|
|
by allocating an array or list :code:`my_data` of length :code:`len(l.circuit.lines)` and |
|
|
|
by allocating an array or list :code:`my_data` of length :code:`len(l.circuit.lines)` and |
|
|
|
accessing it by :code:`my_data[l.index]` or simply by :code:`my_data[l]`. |
|
|
|
accessing it by :code:`my_data[l.index]` or simply by :code:`my_data[l]`. |
|
|
|
""" |
|
|
|
""" |
|
|
|
if not isinstance(driver, tuple): driver = (driver, driver.outs.free_index()) |
|
|
|
if not isinstance(driver, tuple): driver = (driver, driver.outs.free_idx) |
|
|
|
self.driver = driver[0] |
|
|
|
self.driver = driver[0] |
|
|
|
"""The :class:`Node` object that drives this line. |
|
|
|
"""The :class:`Node` object that drives this line. |
|
|
|
""" |
|
|
|
""" |
|
|
@ -157,7 +172,7 @@ class Line: |
|
|
|
This is the position in the list :py:attr:`Node.outs` of the driving node this line referenced from: |
|
|
|
This is the position in the list :py:attr:`Node.outs` of the driving node this line referenced from: |
|
|
|
:code:`self.driver.outs[self.driver_pin] == self`. |
|
|
|
:code:`self.driver.outs[self.driver_pin] == self`. |
|
|
|
""" |
|
|
|
""" |
|
|
|
if not isinstance(reader, tuple): reader = (reader, reader.ins.free_index()) |
|
|
|
if not isinstance(reader, tuple): reader = (reader, reader.ins.free_idx) |
|
|
|
self.reader = reader[0] |
|
|
|
self.reader = reader[0] |
|
|
|
"""The :class:`Node` object that reads this line. |
|
|
|
"""The :class:`Node` object that reads this line. |
|
|
|
""" |
|
|
|
""" |
|
|
@ -334,15 +349,16 @@ class Circuit: |
|
|
|
def get_or_add_fork(self, name): |
|
|
|
def get_or_add_fork(self, name): |
|
|
|
return self.forks[name] if name in self.forks else Node(self, name) |
|
|
|
return self.forks[name] if name in self.forks else Node(self, name) |
|
|
|
|
|
|
|
|
|
|
|
def remove_dangling_nodes(self, root_node:Node): |
|
|
|
def remove_dangling_nodes(self, root_node:Node, keep=[]): |
|
|
|
if len([l for l in root_node.outs if l is not None]) > 0: return |
|
|
|
if len([l for l in root_node.outs if l is not None]) > 0: return |
|
|
|
lines = [l for l in root_node.ins if l is not None] |
|
|
|
lines = [l for l in root_node.ins if l is not None] |
|
|
|
drivers = [l.driver for l in lines] |
|
|
|
drivers = [l.driver for l in lines] |
|
|
|
|
|
|
|
if root_node in keep: return |
|
|
|
root_node.remove() |
|
|
|
root_node.remove() |
|
|
|
for l in lines: |
|
|
|
for l in lines: |
|
|
|
l.remove() |
|
|
|
l.remove() |
|
|
|
for d in drivers: |
|
|
|
for d in drivers: |
|
|
|
self.remove_dangling_nodes(d) |
|
|
|
self.remove_dangling_nodes(d, keep=keep) |
|
|
|
|
|
|
|
|
|
|
|
def eliminate_1to1_forks(self): |
|
|
|
def eliminate_1to1_forks(self): |
|
|
|
"""Removes all forks that drive only one node. |
|
|
|
"""Removes all forks that drive only one node. |
|
|
@ -370,6 +386,21 @@ class Circuit: |
|
|
|
in_line.reader_pin = out_reader_pin |
|
|
|
in_line.reader_pin = out_reader_pin |
|
|
|
in_line.reader.ins[in_line.reader_pin] = in_line |
|
|
|
in_line.reader.ins[in_line.reader_pin] = in_line |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def remove_forks(self): |
|
|
|
|
|
|
|
ios = set(self.io_nodes) |
|
|
|
|
|
|
|
for n in list(self.forks.values()): |
|
|
|
|
|
|
|
if n in ios: continue |
|
|
|
|
|
|
|
d = None |
|
|
|
|
|
|
|
if (l := n.ins[0]) is not None: |
|
|
|
|
|
|
|
d = l.driver |
|
|
|
|
|
|
|
l.remove() |
|
|
|
|
|
|
|
for l in list(n.outs): |
|
|
|
|
|
|
|
if l is None: continue |
|
|
|
|
|
|
|
r, rp = l.reader, l.reader_pin |
|
|
|
|
|
|
|
l.remove() |
|
|
|
|
|
|
|
if d is not None: Line(self, d, (r, rp)) |
|
|
|
|
|
|
|
n.remove() |
|
|
|
|
|
|
|
|
|
|
|
def substitute(self, node, impl): |
|
|
|
def substitute(self, node, impl): |
|
|
|
"""Replaces a given node with the given implementation circuit. |
|
|
|
"""Replaces a given node with the given implementation circuit. |
|
|
|
|
|
|
|
|
|
|
@ -428,7 +459,7 @@ class Circuit: |
|
|
|
for l, ll in zip(impl_out_lines, node_out_lines): # connect outputs |
|
|
|
for l, ll in zip(impl_out_lines, node_out_lines): # connect outputs |
|
|
|
if ll is None: |
|
|
|
if ll is None: |
|
|
|
if l.driver in node_map: |
|
|
|
if l.driver in node_map: |
|
|
|
self.remove_dangling_nodes(node_map[l.driver]) |
|
|
|
self.remove_dangling_nodes(node_map[l.driver], keep=set(self.s_nodes)) |
|
|
|
continue |
|
|
|
continue |
|
|
|
if len(l.reader.outs) > 0: # output is also read by impl. circuit, connect to fork. |
|
|
|
if len(l.reader.outs) > 0: # output is also read by impl. circuit, connect to fork. |
|
|
|
ll.driver = node_map[l.reader] |
|
|
|
ll.driver = node_map[l.reader] |
|
|
@ -447,6 +478,21 @@ class Circuit: |
|
|
|
if n.kind in tlib.cells: |
|
|
|
if n.kind in tlib.cells: |
|
|
|
self.substitute(n, tlib.cells[n.kind][0]) |
|
|
|
self.substitute(n, tlib.cells[n.kind][0]) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def remove_constants(self): |
|
|
|
|
|
|
|
c1gen = None |
|
|
|
|
|
|
|
for n in self.nodes: |
|
|
|
|
|
|
|
if n.kind == '__const0__': # just remove, unconnected inputs are defined 0. |
|
|
|
|
|
|
|
for l in n.outs: |
|
|
|
|
|
|
|
l.remove() |
|
|
|
|
|
|
|
n.remove() |
|
|
|
|
|
|
|
elif n.kind == '__const1__': |
|
|
|
|
|
|
|
if c1gen is None: c1gen = Node(self, '__const1gen__', 'INV1') # one unique const 1 generator |
|
|
|
|
|
|
|
for l in n.outs: |
|
|
|
|
|
|
|
r, rp = l.reader, l.reader_pin |
|
|
|
|
|
|
|
l.remove() |
|
|
|
|
|
|
|
Line(self, c1gen, (r, rp)) |
|
|
|
|
|
|
|
n.remove() |
|
|
|
|
|
|
|
|
|
|
|
def copy(self): |
|
|
|
def copy(self): |
|
|
|
"""Returns a deep copy of the circuit. |
|
|
|
"""Returns a deep copy of the circuit. |
|
|
|
""" |
|
|
|
""" |
|
|
@ -501,14 +547,15 @@ class Circuit: |
|
|
|
substrings 'dff' or 'latch' are yielded first. |
|
|
|
substrings 'dff' or 'latch' are yielded first. |
|
|
|
""" |
|
|
|
""" |
|
|
|
visit_count = np.zeros(len(self.nodes), dtype=np.uint32) |
|
|
|
visit_count = np.zeros(len(self.nodes), dtype=np.uint32) |
|
|
|
queue = deque(n for n in self.nodes if len(n.ins) == 0 or 'dff' in n.kind.lower() or 'latch' in n.kind.lower()) |
|
|
|
start = set(n for n in self.nodes if len(n.ins) == 0 or 'dff' in n.kind.lower() or 'latch' in n.kind.lower()) |
|
|
|
|
|
|
|
queue = deque(start) |
|
|
|
while len(queue) > 0: |
|
|
|
while len(queue) > 0: |
|
|
|
n = queue.popleft() |
|
|
|
n = queue.popleft() |
|
|
|
for line in n.outs: |
|
|
|
for line in n.outs: |
|
|
|
if line is None: continue |
|
|
|
if line is None: continue |
|
|
|
succ = line.reader |
|
|
|
succ = line.reader |
|
|
|
visit_count[succ] += 1 |
|
|
|
visit_count[succ] += 1 |
|
|
|
if visit_count[succ] == len(succ.ins) and 'dff' not in succ.kind.lower() and 'latch' not in succ.kind.lower(): |
|
|
|
if visit_count[succ] == len(succ.ins) and succ not in start: |
|
|
|
queue.append(succ) |
|
|
|
queue.append(succ) |
|
|
|
yield n |
|
|
|
yield n |
|
|
|
|
|
|
|
|
|
|
|