Browse Source

Documentation, cleanup, multi-valued logic

devel
Stefan Holst 4 years ago
parent
commit
7bcfbf502b
  1. 9
      docs/datastructures.rst
  2. 38
      docs/parsers.rst
  3. 16
      docs/simulators.rst
  4. 28
      src/kyupy/__init__.py
  5. 39
      src/kyupy/bench.py
  6. 23
      src/kyupy/bittools.py
  7. 205
      src/kyupy/logic.py
  8. 7
      src/kyupy/logic_sim.py
  9. 3
      src/kyupy/packed_vectors.py
  10. 66
      src/kyupy/sdf.py
  11. 103
      src/kyupy/stil.py
  12. 37
      src/kyupy/verilog.py
  13. 327
      src/kyupy/wave_sim.py
  14. 317
      src/kyupy/wave_sim_cuda.py
  15. 2
      tests/test_bench.py
  16. 43
      tests/test_logic.py
  17. 2
      tests/test_logic_sim.py
  18. 6
      tests/test_sdf.py
  19. 2
      tests/test_stil.py
  20. 3
      tests/test_verilog.py
  21. 15
      tests/test_wave_sim.py

9
docs/datastructures.rst

@ -20,15 +20,10 @@ Circuit Graph - :mod:`kyupy.circuit`
.. autoclass:: kyupy.circuit.Circuit .. autoclass:: kyupy.circuit.Circuit
:members: :members:
M-Valued Logic - :mod:`kyupy.logic` Multi-Valued Logic - :mod:`kyupy.logic`
----------------------------------- ---------------------------------------
.. automodule:: kyupy.logic .. automodule:: kyupy.logic
:members: :members:
.. autoclass:: kyupy.logic.MVArray
:members:
.. autoclass:: kyupy.logic.BPArray
:members:

38
docs/parsers.rst

@ -1,12 +1,42 @@
Parsers Parsers
======= =======
bench KyuPy contains simple (and often incomplete) parsers for common file formats.
These parsers are tailored to the most common use-cases to keep the grammars and the code-base as simple as possible.
verilog Each of the modules export a function ``parse()`` for parsing a string directly and a function
``load()`` for loading a file. Files with a '.gz' extension are uncompressed on-the-fly.
SDF
STIL Verilog - :mod:`kyupy.verilog`
------------------------------
.. automodule:: kyupy.verilog
:members: parse, load
Bench Format - :mod:`kyupy.bench`
---------------------------------
.. automodule:: kyupy.bench
:members: parse, load
Standard Test Interface Language - :mod:`kyupy.stil`
----------------------------------------------------
.. automodule:: kyupy.stil
:members: parse, load
.. autoclass:: kyupy.stil.StilFile
:members:
Standard Delay Format - :mod:`kyupy.sdf`
----------------------------------------
.. automodule:: kyupy.sdf
:members: parse, load
.. autoclass:: kyupy.sdf.DelayFile
:members:

16
docs/simulators.rst

@ -1,8 +1,20 @@
Simulators Simulators
========== ==========
Logic Sim Logic Simulation - :mod:`kyupy.logic_sim`
-----------------------------------------
Wave Sim .. autoclass:: kyupy.logic_sim.LogicSim
:members:
Timing Simulation - :mod:`kyupy.wave_sim`
-----------------------------------------
.. automodule:: kyupy.wave_sim
.. autoclass:: kyupy.wave_sim.WaveSim
:members:
.. autoclass:: kyupy.wave_sim.WaveSimCuda
:members:

28
src/kyupy/__init__.py

@ -1,10 +1,13 @@
"""This package provides tools for high-performance processing and validation """A package for processing and analysis of non-hierarchical gate-level VLSI designs.
of non-hierarchical VLSI circuits to aid rapid prototyping of research code
in the fields of VLSI test, diagnosis and reliability. It contains fundamental building blocks for research software in the fields of VLSI test, diagnosis and reliability.
""" """
import time import time
import importlib.util import importlib.util
import gzip
import numpy as np
class Log: class Log:
@ -97,12 +100,27 @@ if importlib.util.find_spec('numba') is not None:
list(numba.cuda.gpus) list(numba.cuda.gpus)
from numba import cuda from numba import cuda
except CudaSupportError: except CudaSupportError:
log.warn('Cuda unavailable. Falling back to pure python') log.warn('Cuda unavailable. Falling back to pure Python.')
cuda = MockCuda() cuda = MockCuda()
else: else:
numba = MockNumba() numba = MockNumba()
cuda = MockCuda() cuda = MockCuda()
log.warn('Numba unavailable. Falling back to pure python') log.warn('Numba unavailable. Falling back to pure Python.')
_pop_count_lut = np.asarray([bin(x).count('1') for x in range(256)])
def popcount(a):
return np.sum(_pop_count_lut[a])
def readtext(file):
if hasattr(file, 'read'):
return file.read()
if str(file).endswith('.gz'):
with gzip.open(file, 'rt') as f:
return f.read()
else:
with open(file, 'rt') as f:
return f.read()

39
src/kyupy/bench.py

@ -1,5 +1,16 @@
"""A parser for the ISCAS89 benchmark format.
The ISCAS89 benchmark format (`.bench`-suffix) is a very simple textual description of gate-level netlists.
Historically it was first used in the
`ISCAS89 benchmark set <https://people.engr.ncsu.edu/brglez/CBL/benchmarks/ISCAS89/>`_.
Besides loading these benchmarks, this module is also useful for easily constructing simple circuits:
``c = bench.parse('input(x, y) output(a, o, n) a=and(x,y) o=or(x,y) n=not(x)')``.
"""
from lark import Lark, Transformer from lark import Lark, Transformer
from .circuit import Circuit, Node, Line from .circuit import Circuit, Node, Line
from . import readtext
class BenchTransformer(Transformer): class BenchTransformer(Transformer):
@ -21,8 +32,7 @@ class BenchTransformer(Transformer):
[Line(self.c, d, cell) for d in drivers] [Line(self.c, d, cell) for d in drivers]
def parse(bench): grammar = r"""
grammar = r"""
start: (statement)* start: (statement)*
statement: input | output | assignment statement: input | output | assignment
input: ("INPUT" | "input") parameters -> interface input: ("INPUT" | "input") parameters -> interface
@ -32,12 +42,23 @@ def parse(bench):
NAME: /[-_a-z0-9]+/i NAME: /[-_a-z0-9]+/i
%ignore ( /\r?\n/ | "#" /[^\n]*/ | /[\t\f ]/ )+ %ignore ( /\r?\n/ | "#" /[^\n]*/ | /[\t\f ]/ )+
""" """
name = None
if '(' not in str(bench): # No parentheses?: Assuming it is a file name.
name = str(bench).replace('.bench', '') def parse(text, name=None):
with open(bench, 'r') as f: """Parses the given ``text`` as ISCAS89 bench code.
text = f.read()
else: :param text: A string with bench code.
text = bench :param name: The name of the circuit. Circuit names are not included in bench descriptions.
:return: A :class:`Circuit` object.
"""
return Lark(grammar, parser="lalr", transformer=BenchTransformer(name)).parse(text) return Lark(grammar, parser="lalr", transformer=BenchTransformer(name)).parse(text)
def load(file, name=None):
"""Parses the contents of ``file`` as ISCAS89 bench code.
:param file: The file to be loaded.
:param name: The name of the circuit. If none given, the file name is used as circuit name.
:return: A :class:`Circuit` object.
"""
return parse(readtext(file), name=name or str(file))

23
src/kyupy/bittools.py

@ -1,23 +0,0 @@
import numpy as np
import importlib.util
if importlib.util.find_spec('numba') is not None:
import numba
else:
from . import numba
print('Numba unavailable. Falling back to pure python')
_pop_count_lut = np.asarray([bin(x).count('1') for x in range(256)])
def popcount(a):
return np.sum(_pop_count_lut[a])
_bit_in_lut = np.array([2 ** x for x in range(7, -1, -1)], dtype='uint8')
@numba.njit
def bit_in(a, pos):
return a[pos >> 3] & _bit_in_lut[pos & 7]

205
src/kyupy/logic.py

@ -1,30 +1,23 @@
"""Data structures for 2-valued, 4-valued, and 8-valued logic computation. """This module contains definitions and data structures for 2-, 4-, and 8-valued logic operations.
Integer constants: ZERO, ONE, UNASSIGNED, UNKNOWN, RISING, FALLING, PPULSE, NPULSE. 8 logic values are defined as integer constants.
* The bits in the constants have the following meaning: * For 2-valued logic: ``ZERO`` and ``ONE``
* 4-valued logic adds: ``UNASSIGNED`` and ``UNKNOWN``
* 8-valued logic adds: ``RISE``, ``FALL``, ``PPULSE``, and ``NPULSE``.
The bits in these constants have the following meaning:
* bit 0: Final/settled binary value of a signal * bit 0: Final/settled binary value of a signal
* bit 1: Initial binary value of a signal * bit 1: Initial binary value of a signal
* bit 2: 1, if activity or transitions are present on a signal * bit 2: Activity or transitions are present on a signal
Special meaning is given to values where bits 0 and 1 differ, but activity is 0.
These values are interpreted as 'unknown' or 'unassigned' in 4-valued and 8-valued logic.
* 4-valued logic: 2 bits for storage, the third bit is implicitly 0
* 0 (0b00) : '0', 0, False, logic-0 (kyupy.logic.ZERO) Special meaning is given to values where bits 0 and 1 differ, but bit 2 (activity) is 0.
* 1 (0b01) : '-', None, unassigned (kyupy.logic.UNASSIGNED) These values are interpreted as ``UNKNOWN`` or ``UNASSIGNED`` in 4-valued and 8-valued logic.
* 2 (0b10) : 'X', unknown (kyupy.logic.UNKNOWN)
* 3 (0b11) : '1', 1, True, logic-1 (kyupy.logic.ONE)
* 8-valued logic: 3 bits for storage, adds the following 4 interpretations
* 4 (0b100) : 'P', positive pulse 0 -> 1 -> 0 (kyupy.logic.PPULSE)
* 5 (0b101) : 'R', rising transition (kyupy.logic.RISING)
* 6 (0b110) : 'F', falling transition (kyupy.logic.FALLING)
* 7 (0b111) : 'N', negative pulse 1 -> 0 -> 1 (kyupy.logic.NPULSE)
In general, 2-valued logic only considers bit 0, 4-valued logic considers bits 0 and 1, and 8-valued logic
considers all 3 bits.
The only exception is constant ``ONE=0b11`` which has two bits set for all logics including 2-valued logic.
""" """
import math import math
@ -32,16 +25,36 @@ from collections.abc import Iterable
import numpy as np import numpy as np
from . import numba
ZERO = 0b000 ZERO = 0b000
"""Integer constant ``0b000`` for logic-0. """Integer constant ``0b000`` for logic-0. ``'0'``, ``0``, ``False``, ``'L'``, and ``'l'`` are interpreted as ``ZERO``.
"""
UNKNOWN = 0b001
"""Integer constant ``0b001`` for unknown or conflict. ``'X'``, or any other value is interpreted as ``UNKNOWN``.
"""
UNASSIGNED = 0b010
"""Integer constant ``0b010`` for unassigned or high-impedance. ``'-'``, ``None``, ``'Z'``, and ``'z'`` are
interpreted as ``UNASSIGNED``.
""" """
UNASSIGNED = 0b001
UNKNOWN = 0b010
ONE = 0b011 ONE = 0b011
"""Integer constant ``0b011`` for logic-1. ``'1'``, ``1``, ``True``, ``'H'``, and ``'h'`` are interpreted as ``ONE``.
"""
PPULSE = 0b100 PPULSE = 0b100
RISING = 0b101 """Integer constant ``0b100`` for positive pulse, meaning initial and final values are 0, but there is some activity
FALLING = 0b110 on a signal. ``'P'``, ``'p'``, and ``'^'`` are interpreted as ``PPULSE``.
"""
RISE = 0b101
"""Integer constant ``0b110`` for a rising transition. ``'R'``, ``'r'``, and ``'/'`` are interpreted as ``RISE``.
"""
FALL = 0b110
"""Integer constant ``0b101`` for a falling transition. ``'F'``, ``'f'``, and ``'\\'`` are interpreted as ``FALL``.
"""
NPULSE = 0b111 NPULSE = 0b111
"""Integer constant ``0b111`` for negative pulse, meaning initial and final values are 1, but there is some activity
on a signal. ``'N'``, ``'n'``, and ``'v'`` are interpreted as ``NPULSE``.
"""
def interpret(value): def interpret(value):
@ -54,9 +67,9 @@ def interpret(value):
if value in [None, '-', 'Z', 'z']: if value in [None, '-', 'Z', 'z']:
return UNASSIGNED return UNASSIGNED
if value in ['R', 'r', '/']: if value in ['R', 'r', '/']:
return RISING return RISE
if value in ['F', 'f', '\\']: if value in ['F', 'f', '\\']:
return FALLING return FALL
if value in ['P', 'p', '^']: if value in ['P', 'p', '^']:
return PPULSE return PPULSE
if value in ['N', 'n', 'v']: if value in ['N', 'n', 'v']:
@ -64,6 +77,110 @@ def interpret(value):
return UNKNOWN return UNKNOWN
_bit_in_lut = np.array([2 ** x for x in range(7, -1, -1)], dtype='uint8')
@numba.njit
def bit_in(a, pos):
return a[pos >> 3] & _bit_in_lut[pos & 7]
def mv_cast(*args, m=8):
return [a if isinstance(a, MVArray) else MVArray(a, m=m) for a in args]
def mv_getm(*args):
return max([a.m for a in args if isinstance(a, MVArray)] + [0]) or 8
def _mv_not(m, out, inp):
np.bitwise_xor(inp, 0b11, out=out) # this also exchanges UNASSIGNED <-> UNKNOWN
if m > 2:
np.putmask(out, (inp == UNKNOWN), UNKNOWN) # restore UNKNOWN
def mv_not(x1, out=None):
m = mv_getm(x1)
x1 = mv_cast(x1, m=m)[0]
out = out or MVArray(x1.data.shape, m=m)
_mv_not(m, out.data, x1.data)
return out
def _mv_or(m, out, *ins):
if m > 2:
any_unknown = (ins[0] == UNKNOWN) | (ins[0] == UNASSIGNED)
for inp in ins[1:]: any_unknown |= (inp == UNKNOWN) | (inp == UNASSIGNED)
any_one = (ins[0] == ONE)
for inp in ins[1:]: any_one |= (inp == ONE)
out[...] = ZERO
np.putmask(out, any_one, ONE)
for inp in ins:
np.bitwise_or(out, inp, out=out, where=~any_one)
np.putmask(out, (any_unknown & ~any_one), UNKNOWN)
else:
out[...] = ZERO
for inp in ins: np.bitwise_or(out, inp, out=out)
def mv_or(x1, x2, out=None):
m = mv_getm(x1, x2)
x1, x2 = mv_cast(x1, x2, m=m)
out = out or MVArray(np.broadcast(x1.data, x2.data).shape, m=m)
_mv_or(m, out.data, x1.data, x2.data)
return out
def _mv_and(m, out, *ins):
if m > 2:
any_unknown = (ins[0] == UNKNOWN) | (ins[0] == UNASSIGNED)
for inp in ins[1:]: any_unknown |= (inp == UNKNOWN) | (inp == UNASSIGNED)
any_zero = (ins[0] == ZERO)
for inp in ins[1:]: any_zero |= (inp == ZERO)
out[...] = ONE
np.putmask(out, any_zero, ZERO)
for inp in ins:
np.bitwise_and(out, inp | 0b100, out=out, where=~any_zero)
if m > 4: np.bitwise_or(out, inp & 0b100, out=out, where=~any_zero)
np.putmask(out, (any_unknown & ~any_zero), UNKNOWN)
else:
out[...] = ONE
for inp in ins: np.bitwise_and(out, inp, out=out)
def mv_and(x1, x2, out=None):
m = mv_getm(x1, x2)
x1, x2 = mv_cast(x1, x2, m=m)
out = out or MVArray(np.broadcast(x1.data, x2.data).shape, m=m)
_mv_and(m, out.data, x1.data, x2.data)
return out
def _mv_xor(m, out, *ins):
if m > 2:
any_unknown = (ins[0] == UNKNOWN) | (ins[0] == UNASSIGNED)
for inp in ins[1:]: any_unknown |= (inp == UNKNOWN) | (inp == UNASSIGNED)
out[...] = ZERO
for inp in ins:
np.bitwise_xor(out, inp & 0b011, out=out)
if m > 4: np.bitwise_or(out, inp & 0b100, out=out)
np.putmask(out, any_unknown, UNKNOWN)
else:
out[...] = ZERO
for inp in ins: np.bitwise_xor(out, inp, out=out)
def mv_xor(x1, x2, out=None):
m = mv_getm(x1, x2)
x1, x2 = mv_cast(x1, x2, m=m)
out = out or MVArray(np.broadcast(x1.data, x2.data).shape, m=m)
_mv_xor(m, out.data, x1.data, x2.data)
return out
class MVArray: class MVArray:
"""An n-dimensional array of m-valued logic values. """An n-dimensional array of m-valued logic values.
@ -71,11 +188,7 @@ class MVArray:
interpreting 2-valued, 4-valued, and 8-valued logic values. interpreting 2-valued, 4-valued, and 8-valued logic values.
Each logic value is stored as an uint8, value manipulations are cheaper than in BPArray. Each logic value is stored as an uint8, value manipulations are cheaper than in BPArray.
Axis convention (1 axis, a single vector/pattern): An MVArray always has 2 or more axes:
* Axis is PI/PO/FF position, the length of this axis is called "width".
Axis convention for 2 and more axes is consistent with BPArray:
* Second-last axis is PI/PO/FF position, the length of this axis is called "width". * Second-last axis is PI/PO/FF position, the length of this axis is called "width".
* Last axis is vector/pattern, the length of this axis is called "length". * Last axis is vector/pattern, the length of this axis is called "length".
@ -83,33 +196,43 @@ class MVArray:
""" """
def __init__(self, a, m=None): def __init__(self, a, m=None):
self.m = m or 4 self.m = m or 8
assert self.m in range(2, 256) assert self.m in [2, 4, 8]
# Try our best to interpret given a. # Try our best to interpret given a.
if isinstance(a, MVArray): if isinstance(a, MVArray):
self.data = a.data.copy() self.data = a.data.copy()
self.m = m or a.m self.m = m or a.m
elif isinstance(a, int) or isinstance(a, tuple): elif isinstance(a, int):
self.data = np.full((a, 1), UNASSIGNED, dtype=np.uint8)
elif isinstance(a, tuple):
self.data = np.full(a, UNASSIGNED, dtype=np.uint8) self.data = np.full(a, UNASSIGNED, dtype=np.uint8)
else: else:
if isinstance(a, str):
a = [a]
self.data = np.asarray(interpret(a), dtype=np.uint8) self.data = np.asarray(interpret(a), dtype=np.uint8)
if self.data.ndim > 1: if self.data.ndim == 1:
self.data = self.data[:, np.newaxis]
else:
self.data = np.moveaxis(self.data, -2, -1) self.data = np.moveaxis(self.data, -2, -1)
# Cast data to m-valued logic. # Cast data to m-valued logic.
if self.m == 2: if self.m == 2:
self.data[...] = ((self.data & 0b001) & ((self.data >> 1) & 0b001) | (self.data == RISING)) * ONE self.data[...] = ((self.data & 0b001) & ((self.data >> 1) & 0b001) | (self.data == RISE)) * ONE
elif self.m == 4: elif self.m == 4:
self.data[...] = (self.data & 0b011) & ((self.data != FALLING) * ONE) | ((self.data == RISING) * ONE) self.data[...] = (self.data & 0b011) & ((self.data != FALL) * ONE) | ((self.data == RISE) * ONE)
elif self.m == 8: elif self.m == 8:
self.data[...] = self.data & 0b111 self.data[...] = self.data & 0b111
self.length = 1 if self.data.ndim == 1 else self.data.shape[-1] self.length = self.data.shape[-1]
self.width = len(self.data) if self.data.ndim == 1 else self.data.shape[-2] self.width = self.data.shape[-2]
def __repr__(self): def __repr__(self):
return f'<MVArray length={self.length} width={self.width} m={self.m} bytes={self.data.nbytes}>' return f'<MVArray length={self.length} width={self.width} m={self.m} nbytes={self.data.nbytes}>'
def __str__(self):
chars = ["0", "X", "-", "1", "P", "R", "F", "N"]
return str([''.join(chars[v] for v in self.data[:, idx]) for idx in range(self.length)])
class BPArray: class BPArray:

7
src/kyupy/logic_sim.py

@ -1,9 +1,10 @@
import numpy as np import numpy as np
from . import packed_vectors from . import packed_vectors
class LogicSim: class LogicSim:
"""A bit-parallel naive combinational logic simulator supporting 1, 4, or 8-valued logics. """A bit-parallel naïve combinational simulator for 2-, 4-, or 8-valued logic.
""" """
def __init__(self, circuit, nvectors=1, vdim=1): def __init__(self, circuit, nvectors=1, vdim=1):
self.circuit = circuit self.circuit = circuit
@ -52,13 +53,13 @@ class LogicSim:
t = t.replace('__const0__', 'const0') t = t.replace('__const0__', 'const0')
t = t.replace('__const1__', 'const1') t = t.replace('__const1__', 'const1')
t = t.replace('tieh', 'const1') t = t.replace('tieh', 'const1')
# t = t.replace('xor', 'or').replace('xnor', 'nor')
fcts = [f for n, f in known_fct if t.startswith(n)] fcts = [f for n, f in known_fct if t.startswith(n)]
if len(fcts) < 1: if len(fcts) < 1:
raise ValueError(f'Unknown node kind {n.kind}') raise ValueError(f'Unknown node kind {n.kind}')
self.node_fct.append(fcts[0]) self.node_fct.append(fcts[0])
def assign(self, stimuli): def assign(self, stimuli):
"""Assign stimuli to the primary inputs and state-elements (flip-flops)."""
if isinstance(stimuli, packed_vectors.PackedVectors): if isinstance(stimuli, packed_vectors.PackedVectors):
stimuli = stimuli.bits stimuli = stimuli.bits
for (stim, node) in zip(stimuli, self.interface): for (stim, node) in zip(stimuli, self.interface):
@ -78,6 +79,7 @@ class LogicSim:
self.state_epoch[line.reader.index] = self.epoch self.state_epoch[line.reader.index] = self.epoch
def capture(self, responses): def capture(self, responses):
"""Capture the current values at the primary outputs and in the state-elements (flip-flops)."""
if isinstance(responses, packed_vectors.PackedVectors): if isinstance(responses, packed_vectors.PackedVectors):
responses = responses.bits responses = responses.bits
for (resp, node) in zip(responses, self.interface): for (resp, node) in zip(responses, self.interface):
@ -85,6 +87,7 @@ class LogicSim:
resp[...] = self.state[node.ins[0].index] resp[...] = self.state[node.ins[0].index]
def propagate(self): def propagate(self):
"""Propagate the input values towards the outputs (Perform all logic operations in topological order)."""
for node in self.circuit.topological_order(): for node in self.circuit.topological_order():
if self.state_epoch[node.index] != self.epoch: continue if self.state_epoch[node.index] != self.epoch: continue
inputs = [self.state[line.index] if line else self.zero for line in node.ins] inputs = [self.state[line.index] if line else self.zero for line in node.ins]

3
src/kyupy/packed_vectors.py

@ -1,5 +1,6 @@
import numpy as np import numpy as np
from .bittools import popcount, bit_in from . import popcount
from .logic import bit_in
class PackedVectors: class PackedVectors:

66
src/kyupy/sdf.py

@ -1,14 +1,28 @@
"""A simple and incomplete parser for the Standard Delay Format (SDF).
The main purpose of this parser is to extract pin-to-pin delay and interconnect delay information from SDF files.
Sophisticated timing specifications (timing checks, conditional delays, etc.) are currently not supported.
The functions :py:func:`load` and :py:func:`read` return an intermediate representation (:class:`DelayFile` object).
Call :py:func:`DelayFile.annotation` to match the intermediate representation to a given circuit.
"""
from collections import namedtuple
import numpy as np import numpy as np
from lark import Lark, Transformer from lark import Lark, Transformer
from collections import namedtuple
from . import log from . import log, readtext
import gzip
Interconnect = namedtuple('Interconnect', ['orig', 'dest', 'r', 'f']) Interconnect = namedtuple('Interconnect', ['orig', 'dest', 'r', 'f'])
IOPath = namedtuple('IOPath', ['ipin', 'opin', 'r', 'f']) IOPath = namedtuple('IOPath', ['ipin', 'opin', 'r', 'f'])
class DelayFile: class DelayFile:
"""An intermediate representation of an SDF file.
"""
def __init__(self, name, cells): def __init__(self, name, cells):
self.name = name self.name = name
if None in cells: if None in cells:
@ -22,26 +36,26 @@ class DelayFile:
'\n'.join(str(i) for i in self.interconnects) '\n'.join(str(i) for i in self.interconnects)
def annotation(self, circuit, pin_index_f, dataset=1, interconnect=True, ffdelays=True): def annotation(self, circuit, pin_index_f, dataset=1, interconnect=True, ffdelays=True):
""" """Constructs an 3-dimensional ndarray with timing data for each line in ``circuit``.
Constructs an 3-dimensional array with timing data for each line in `circuit`.
Dimension 1 of the returned array is the line index.
Dimension 2 is the type of timing data: 0:`delay`, 1:`pulse rejection limit`.
Dimension 3 is the polarity at the output of the reading node: 0:`rising`, 1:`falling`.
The polarity for pulse rejection is determined by the latter transition of the pulse.
E.g., timing[42,1,0] is the rejection limit of a negative pulse at the output of the reader of line 42.
An IOPATH delay for a node is annotated to the line connected to the input pin specified in the IOPATH. An IOPATH delay for a node is annotated to the line connected to the input pin specified in the IOPATH.
Currently, only ABSOLUTE IOPATH and INTERCONNECT delays are supported. Currently, only ABSOLUTE IOPATH and INTERCONNECT delays are supported.
Pulse rejection limits are derived from absolute delays, explicit declarations (PATHPULSE etc.) are ignored. Pulse rejection limits are derived from absolute delays, explicit declarations (PATHPULSE etc.) are ignored.
:param circuit:
:param pin_index_f:
:param ffdelays: :param ffdelays:
:param interconnect: :param interconnect:
:param pin_index_f:
:param circuit:
:type dataset: int or tuple :type dataset: int or tuple
:return: A 3-dimensional ndarray with timing data.
* Axis 0: line index.
* Axis 1: type of timing data: 0=`delay`, 1=`pulse rejection limit`.
* Axis 2: The polarity of the output transition of the reading node: 0=`rising`, 1=`falling`.
The polarity for pulse rejection is determined by the latter transition of the pulse.
E.g., timing[42,1,0] is the rejection limit of a negative pulse at the output of the reader of line 42.
""" """
def select_del(_delvals, idx): def select_del(_delvals, idx):
if type(dataset) is tuple: if type(dataset) is tuple:
@ -170,8 +184,7 @@ class SdfTransformer(Transformer):
return DelayFile(name, cells) return DelayFile(name, cells)
def parse(sdf): grammar = r"""
grammar = r"""
start: "(DELAYFILE" ( "(SDFVERSION" _NOB ")" start: "(DELAYFILE" ( "(SDFVERSION" _NOB ")"
| "(DESIGN" "\"" NAME "\"" ")" | "(DESIGN" "\"" NAME "\"" ")"
| "(DATE" _NOB ")" | "(DATE" _NOB ")"
@ -201,13 +214,16 @@ def parse(sdf):
%ignore ( /\r?\n/ | COMMENT )+ %ignore ( /\r?\n/ | COMMENT )+
%ignore /[\t\f ]+/ %ignore /[\t\f ]+/
""" """
if '\n' not in str(sdf): # One line?: Assuming it is a file name.
if str(sdf).endswith('.gz'):
with gzip.open(sdf, 'rt') as f: def parse(text):
text = f.read() """Parses the given ``text`` and returns a :class:`DelayFile` object."""
else:
with open(sdf, 'r') as f:
text = f.read()
else:
text = str(sdf)
return Lark(grammar, parser="lalr", transformer=SdfTransformer()).parse(text) return Lark(grammar, parser="lalr", transformer=SdfTransformer()).parse(text)
def load(file):
"""Parses the contents of ``file`` and returns a :class:`DelayFile` object.
The given file may be gzip compressed.
"""
return parse(readtext(file))

103
src/kyupy/stil.py

@ -1,9 +1,20 @@
"""A simple and incomplete parser for the Standard Test Interface Language (STIL).
The main purpose of this parser is to load scan pattern sets from STIL files.
It supports only a very limited subset of STIL.
The functions :py:func:`load` and :py:func:`read` return an intermediate representation (:class:`StilFile` object).
Call :py:func:`StilFile.tests4v`, :py:func:`StilFile.tests8v`, or :py:func:`StilFile.responses4v` to
obtain the appropriate vector sets.
"""
from lark import Lark, Transformer from lark import Lark, Transformer
from collections import namedtuple from collections import namedtuple
import re import re
import gzip
from .packed_vectors import PackedVectors from .packed_vectors import PackedVectors
from .logic_sim import LogicSim from .logic_sim import LogicSim
from . import readtext
Call = namedtuple('Call', ['name', 'parameters']) Call = namedtuple('Call', ['name', 'parameters'])
@ -11,6 +22,8 @@ ScanPattern = namedtuple('ScanPattern', ['load', 'launch', 'capture', 'unload'])
class StilFile: class StilFile:
"""An intermediate representation of a STIL file.
"""
def __init__(self, version, signal_groups, scan_chains, calls): def __init__(self, version, signal_groups, scan_chains, calls):
self.version = version self.version = version
self.signal_groups = signal_groups self.signal_groups = signal_groups
@ -21,7 +34,7 @@ class StilFile:
self.patterns = [] self.patterns = []
launch = {} launch = {}
capture = {} capture = {}
load = {} sload = {}
for call in self.calls: for call in self.calls:
if call.name == 'load_unload': if call.name == 'load_unload':
unload = {} unload = {}
@ -29,13 +42,13 @@ class StilFile:
if so_port in call.parameters: if so_port in call.parameters:
unload[so_port] = call.parameters[so_port].replace('\n', '') unload[so_port] = call.parameters[so_port].replace('\n', '')
if len(launch) > 0: if len(launch) > 0:
self.patterns.append(ScanPattern(load, launch, capture, unload)) self.patterns.append(ScanPattern(sload, launch, capture, unload))
capture = {} capture = {}
launch = {} launch = {}
load = {} sload = {}
for si_port in self.si_ports: for si_port in self.si_ports:
if si_port in call.parameters: if si_port in call.parameters:
load[si_port] = call.parameters[si_port].replace('\n', '') sload[si_port] = call.parameters[si_port].replace('\n', '')
if call.name.endswith('_launch') or call.name.endswith('_capture'): if call.name.endswith('_launch') or call.name.endswith('_capture'):
if len(launch) == 0: if len(launch) == 0:
launch = dict((k, v.replace('\n', '')) for k, v in call.parameters.items()) launch = dict((k, v.replace('\n', '')) for k, v in call.parameters.items())
@ -73,8 +86,12 @@ class StilFile:
scan_inversions[chain[-1]] = scan_out_inversion scan_inversions[chain[-1]] = scan_out_inversion
return interface, pi_map, po_map, scan_maps, scan_inversions return interface, pi_map, po_map, scan_maps, scan_inversions
def tests(self, c): def tests4v(self, circuit):
interface, pi_map, po_map, scan_maps, scan_inversions = self._maps(c) """Assembles and returns a scan test pattern set in 4-valued logic for given circuit.
This function assumes a static (stuck-at fault) test.
"""
interface, pi_map, po_map, scan_maps, scan_inversions = self._maps(circuit)
tests = PackedVectors(len(self.patterns), len(interface), 2) tests = PackedVectors(len(self.patterns), len(interface), 2)
for i, p in enumerate(self.patterns): for i, p in enumerate(self.patterns):
for si_port in self.si_ports.keys(): for si_port in self.si_ports.keys():
@ -82,15 +99,21 @@ class StilFile:
tests.set_values(i, p.launch['_pi'], pi_map) tests.set_values(i, p.launch['_pi'], pi_map)
return tests return tests
def tests8v(self, c): def tests8v(self, circuit):
interface, pi_map, po_map, scan_maps, scan_inversions = self._maps(c) """Assembles and returns a scan test pattern set in 8-valued logic for given circuit.
This function assumes a launch-on-capture (LoC) delay test.
It performs a logic simulation to obtain the first capture pattern (the one that launches the
delay test) and assembles the test pattern set from from pairs for initialization- and launch-patterns.
"""
interface, pi_map, po_map, scan_maps, scan_inversions = self._maps(circuit)
init = PackedVectors(len(self.patterns), len(interface), 2) init = PackedVectors(len(self.patterns), len(interface), 2)
for i, p in enumerate(self.patterns): for i, p in enumerate(self.patterns):
# init.set_values(i, '0' * len(interface)) # init.set_values(i, '0' * len(interface))
for si_port in self.si_ports.keys(): for si_port in self.si_ports.keys():
init.set_values(i, p.load[si_port], scan_maps[si_port], scan_inversions[si_port]) init.set_values(i, p.load[si_port], scan_maps[si_port], scan_inversions[si_port])
init.set_values(i, p.launch['_pi'], pi_map) init.set_values(i, p.launch['_pi'], pi_map)
sim4v = LogicSim(c, len(init), 2) sim4v = LogicSim(circuit, len(init), 2)
sim4v.assign(init) sim4v.assign(init)
sim4v.propagate() sim4v.propagate()
launch = init.copy() launch = init.copy()
@ -105,8 +128,9 @@ class StilFile:
return PackedVectors.from_pair(init, launch) return PackedVectors.from_pair(init, launch)
def responses(self, c): def responses4v(self, circuit):
interface, pi_map, po_map, scan_maps, scan_inversions = self._maps(c) """Assembles and returns a scan test response pattern set in 4-valued logic for given circuit."""
interface, pi_map, po_map, scan_maps, scan_inversions = self._maps(circuit)
resp = PackedVectors(len(self.patterns), len(interface), 2) resp = PackedVectors(len(self.patterns), len(interface), 2)
for i, p in enumerate(self.patterns): for i, p in enumerate(self.patterns):
if len(p.capture) > 0: if len(p.capture) > 0:
@ -162,8 +186,7 @@ class StilTransformer(Transformer):
return StilFile(float(args[0]), self._signal_groups, self._scan_chains, self._calls) return StilFile(float(args[0]), self._signal_groups, self._scan_chains, self._calls)
def parse(stil): grammar = r"""
grammar = r"""
start: "STIL" FLOAT _ignore _block* start: "STIL" FLOAT _ignore _block*
_block: signal_groups | scan_structures | pattern _block: signal_groups | scan_structures | pattern
| "Header" _ignore | "Header" _ignore
@ -203,50 +226,16 @@ def parse(stil):
_NOB: /[^{}]+/ _NOB: /[^{}]+/
%ignore ( /\r?\n/ | "//" /[^\n]*/ | /[\t\f ]/ )+ %ignore ( /\r?\n/ | "//" /[^\n]*/ | /[\t\f ]/ )+
""" """
if '\n' not in str(stil): # One line?: Assuming it is a file name.
if str(stil).endswith('.gz'):
with gzip.open(stil, 'rt') as f: def parse(text):
text = f.read() """Parses the given ``text`` and returns a :class:`StilFile` object."""
else:
with open(stil, 'r') as f:
text = f.read()
else:
text = str(stil)
return Lark(grammar, parser="lalr", transformer=StilTransformer()).parse(text) return Lark(grammar, parser="lalr", transformer=StilTransformer()).parse(text)
def extract_scan_pattens(stil_calls): def load(file):
pats = [] """Parses the contents of ``file`` and returns a :class:`StilFile` object.
pi = None
scan_in = None
for call in stil_calls:
if call.name == 'load_unload':
scan_out = call.parameters.get('Scan_Out')
if scan_out is not None:
scan_out = scan_out.replace('\n', '')
if pi: pats.append(ScanPattern(scan_in, pi, None, scan_out))
scan_in = call.parameters.get('Scan_In')
if scan_in is not None:
scan_in = scan_in.replace('\n', '')
if call.name == 'allclock_capture':
pi = call.parameters['_pi'].replace('\n', '')
return pats
def match_patterns(stil_file, pats, interface):
intf_pos = dict([(n.name, i) for i, n in enumerate(interface)])
pi_map = [intf_pos[n] for n in stil_file.signal_groups['_pi']]
scan_map = [intf_pos[re.sub(r'b..\.', '', n)] for n in reversed(stil_file.scan_chains['1'])]
# print(scan_map)
tests = PackedVectors(len(pats), len(interface), 2)
for i, p in enumerate(pats):
tests.set_values(i, p.scan_in, scan_map)
tests.set_values(i, p.pi, pi_map)
resp = PackedVectors(len(pats), len(interface), 2)
for i, p in enumerate(pats):
resp.set_values(i, p.pi, pi_map)
resp.set_values(i, p.scan_out, scan_map)
return tests, resp
The given file may be gzip compressed.
"""
return parse(readtext(file))

37
src/kyupy/verilog.py

@ -1,8 +1,14 @@
"""A simple and incomplete parser for Verilog files.
The main purpose of this parser is to load synthesized, non-hierarchical (flat) gate-level netlists.
It supports only a very limited subset of Verilog.
"""
from collections import namedtuple from collections import namedtuple
import gzip
from lark import Lark, Transformer from lark import Lark, Transformer
from . import readtext
from .circuit import Circuit, Node, Line from .circuit import Circuit, Node, Line
from .saed import pin_index, pin_is_output from .saed import pin_index, pin_is_output
@ -152,22 +158,21 @@ grammar = """
""" """
def loads(s, *, branchforks=False): def parse(text, *, branchforks=False):
return Lark(grammar, parser="lalr", transformer=VerilogTransformer(branchforks)).parse(s) """Parses the given ``text`` as Verilog code.
:param text: A string with Verilog code.
:param branchforks: If set to ``True``, the returned circuit will include additional `forks` on each fanout branch.
These forks are needed to correctly annotate interconnect delays
(see :py:func:`kyupy.sdf.DelayFile.annotation`).
:return: A :class:`~kyupy.circuit.Circuit` object.
"""
return Lark(grammar, parser="lalr", transformer=VerilogTransformer(branchforks)).parse(text)
def load(fp, *, branchforks=False):
return loads(fp.read(), branchforks=branchforks)
def load(file, *args, **kwargs):
"""Parses the contents of ``file`` as Verilog code.
def parse(verilog, branchforks=False): The given file may be gzip compressed. Takes the same keyword arguments as :py:func:`parse`.
if '\n' not in str(verilog): # One line?: Assuming it is a file name. """
if str(verilog).endswith('.gz'): return parse(readtext(file), *args, **kwargs)
with gzip.open(verilog, 'rt') as f:
text = f.read()
else:
with open(verilog, 'r') as f:
text = f.read()
else:
text = str(verilog)
return loads(text, branchforks=branchforks)

327
src/kyupy/wave_sim.py

@ -1,8 +1,24 @@
"""High-Throughput combinational logic timing simulators.
These simulators work similarly to :py:class:`kyupy.logic_sim.LogicSim`.
They propagate values through the combinational circuit from (pseudo) primary inputs to (pseudo) primary outputs.
Instead of propagating logic values, these simulators propagate signal histories (waveforms).
They are designed to run many simulations in parallel and while their latencies are quite high, they achieve
high throughput performance.
The simulators are not event-based and are not capable of simulating sequential circuits directly.
Two simulators are available: :py:class:`WaveSim` runs on the CPU, and the derived class
:py:class:`WaveSimCuda` runs on the GPU.
"""
import math import math
from bisect import bisect, insort_left from bisect import bisect, insort_left
import numpy as np import numpy as np
from . import numba from . import numba
from . import cuda
TMAX = np.float32(2 ** 127) # almost np.PINF for 32-bit floating point values TMAX = np.float32(2 ** 127) # almost np.PINF for 32-bit floating point values
@ -77,6 +93,7 @@ class Heap:
class WaveSim: class WaveSim:
"""A waveform-based combinational logic timing simulator."""
def __init__(self, circuit, timing, sims=8, wavecaps=16, strip_forks=False, keep_waveforms=True): def __init__(self, circuit, timing, sims=8, wavecaps=16, strip_forks=False, keep_waveforms=True):
self.circuit = circuit self.circuit = circuit
self.sims = sims self.sims = sims
@ -519,3 +536,313 @@ def wave_eval(op, state, sat, st_idx, line_times, sd=0.0, seed=0):
state[z_mem + z_cur, st_idx] = a if a > b else b # propagate overflow flags by storing biggest TMAX from input state[z_mem + z_cur, st_idx] = a if a > b else b # propagate overflow flags by storing biggest TMAX from input
return overflows return overflows
class WaveSimCuda(WaveSim):
"""A GPU-accelerated waveform-based combinational logic timing simulator."""
def __init__(self, circuit, timing, sims=8, wavecaps=16, strip_forks=False, keep_waveforms=True):
super().__init__(circuit, timing, sims, wavecaps, strip_forks, keep_waveforms)
self.tdata = np.zeros((len(self.interface), 3, (sims - 1) // 8 + 1), dtype='uint8')
self.d_state = cuda.to_device(self.state)
self.d_sat = cuda.to_device(self.sat)
self.d_ops = cuda.to_device(self.ops)
self.d_timing = cuda.to_device(self.timing)
self.d_tdata = cuda.to_device(self.tdata)
self.d_cdata = cuda.to_device(self.cdata)
self._block_dim = (32, 16)
def get_line_delay(self, line, polarity):
return self.d_timing[line, 0, polarity]
def set_line_delay(self, line, polarity, delay):
self.d_timing[line, 0, polarity] = delay
def assign(self, vectors, time=0.0, offset=0):
assert (offset % 8) == 0
byte_offset = offset // 8
assert byte_offset < vectors.bits.shape[-1]
pdim = min(vectors.bits.shape[-1] - byte_offset, self.tdata.shape[-1])
self.tdata[..., 0:pdim] = vectors.bits[..., byte_offset:pdim + byte_offset]
if vectors.vdim == 1:
self.tdata[:, 1, 0:pdim] = ~self.tdata[:, 1, 0:pdim]
self.tdata[:, 2, 0:pdim] = 0
cuda.to_device(self.tdata, to=self.d_tdata)
grid_dim = self._grid_dim(self.sims, len(self.interface))
assign_kernel[grid_dim, self._block_dim](self.d_state, self.d_sat, self.ppi_offset,
len(self.interface), self.d_tdata, time)
def _grid_dim(self, x, y):
gx = math.ceil(x / self._block_dim[0])
gy = math.ceil(y / self._block_dim[1])
return gx, gy
def propagate(self, sims=None, sd=0.0, seed=1):
if sims is None:
sims = self.sims
else:
sims = min(sims, self.sims)
for op_start, op_stop in zip(self.level_starts, self.level_stops):
grid_dim = self._grid_dim(sims, op_stop - op_start)
wave_kernel[grid_dim, self._block_dim](self.d_ops, op_start, op_stop, self.d_state, self.sat, int(0),
sims, self.d_timing, sd, seed)
cuda.synchronize()
self.lst_eat_valid = False
def wave(self, line, vector):
if line < 0:
return None
mem, wcap, _ = self.sat[line]
if mem < 0:
return None
return self.d_state[mem:mem + wcap, vector]
def capture(self, time=TMAX, sd=0, seed=1, cdata=None, offset=0):
grid_dim = self._grid_dim(self.sims, len(self.interface))
capture_kernel[grid_dim, self._block_dim](self.d_state, self.d_sat, self.ppo_offset,
self.d_cdata, time, sd * math.sqrt(2), seed)
self.cdata[...] = self.d_cdata
if cdata is not None:
assert offset < cdata.shape[1]
cap_dim = min(cdata.shape[1] - offset, self.sims)
cdata[:, offset:cap_dim + offset] = self.cdata[:, 0:cap_dim]
self.lst_eat_valid = True
return self.cdata
def reassign(self, time=0.0):
grid_dim = self._grid_dim(self.sims, len(self.interface))
reassign_kernel[grid_dim, self._block_dim](self.d_state, self.d_sat, self.ppi_offset, self.ppo_offset,
self.d_cdata, time)
cuda.synchronize()
def wavecaps(self):
gx = math.ceil(len(self.circuit.lines) / 512)
wavecaps_kernel[gx, 512](self.d_state, self.d_sat, self.sims)
self.sat[...] = self.d_sat
return self.sat[..., 2]
@cuda.jit()
def wavecaps_kernel(state, sat, sims):
idx = cuda.grid(1)
if idx >= len(sat): return
lidx, lcap, _ = sat[idx]
if lidx < 0: return
wcap = 0
for sidx in range(sims):
for tidx in range(lcap):
t = state[lidx + tidx, sidx]
if tidx > wcap:
wcap = tidx
if t >= TMAX: break
sat[idx, 2] = wcap + 1
@cuda.jit()
def reassign_kernel(state, sat, ppi_offset, ppo_offset, cdata, ppi_time):
vector, y = cuda.grid(2)
if vector >= state.shape[-1]: return
if ppo_offset + y >= len(sat): return
ppo, ppo_cap, _ = sat[ppo_offset + y]
ppi, ppi_cap, _ = sat[ppi_offset + y]
if ppo < 0: return
if ppi < 0: return
ppo_val = int(cdata[y, vector, 1])
ppi_val = int(0)
for tidx in range(ppi_cap):
t = state[ppi + tidx, vector]
if t >= TMAX: break
ppi_val ^= 1
# make new waveform at PPI
toggle = 0
if ppi_val:
state[ppi + toggle, vector] = TMIN
toggle += 1
if ppi_val != ppo_val:
state[ppi + toggle, vector] = ppi_time
toggle += 1
state[ppi + toggle, vector] = TMAX
@cuda.jit()
def capture_kernel(state, sat, ppo_offset, cdata, time, s_sqrt2, seed):
x, y = cuda.grid(2)
if ppo_offset + y >= len(sat): return
line, tdim, _ = sat[ppo_offset + y]
if line < 0: return
if x >= state.shape[-1]: return
vector = x
m = 0.5
acc = 0.0
eat = TMAX
lst = TMIN
tog = 0
ovl = 0
val = int(0)
final = int(0)
for tidx in range(tdim):
t = state[line + tidx, vector]
if t >= TMAX:
if t == TMAX_OVL:
ovl = 1
break
m = -m
final ^= 1
if t < time:
val ^= 1
if t <= TMIN: continue
if s_sqrt2 > 0:
acc += m * (1 + math.erf((t - time) / s_sqrt2))
eat = min(eat, t)
lst = max(lst, t)
tog += 1
if s_sqrt2 > 0:
if m < 0:
acc += 1
if acc >= 0.99:
val = 1
elif acc > 0.01:
seed = (seed << 4) + (vector << 20) + (y << 1)
seed = int(0xDEECE66D) * seed + 0xB
seed = int(0xDEECE66D) * seed + 0xB
rnd = float((seed >> 8) & 0xffffff) / float(1 << 24)
val = rnd < acc
else:
val = 0
else:
acc = val
cdata[y, vector, 0] = acc
cdata[y, vector, 1] = val
cdata[y, vector, 2] = final
cdata[y, vector, 3] = (val != final)
cdata[y, vector, 4] = eat
cdata[y, vector, 5] = lst
cdata[y, vector, 6] = ovl
@cuda.jit()
def assign_kernel(state, sat, ppi_offset, intf_len, tdata, time):
x, y = cuda.grid(2)
if y >= intf_len: return
line = sat[ppi_offset + y, 0]
if line < 0: return
sdim = state.shape[-1]
if x >= sdim: return
vector = x
a0 = tdata[y, 0, vector // 8]
a1 = tdata[y, 1, vector // 8]
a2 = tdata[y, 2, vector // 8]
m = np.uint8(1 << (7 - (vector % 8)))
toggle = 0
if a0 & m:
state[line + toggle, x] = TMIN
toggle += 1
if (a2 & m) and ((a0 & m) == (a1 & m)):
state[line + toggle, x] = time
toggle += 1
state[line + toggle, x] = TMAX
@cuda.jit(device=True)
def rand_gauss_dev(seed, sd):
clamp = 0.5
if sd <= 0.0:
return 1.0
while True:
x = -6.0
for i in range(12):
seed = int(0xDEECE66D) * seed + 0xB
x += float((seed >> 8) & 0xffffff) / float(1 << 24)
x *= sd
if abs(x) <= clamp:
break
return x + 1.0
@cuda.jit()
def wave_kernel(ops, op_start, op_stop, state, sat, st_start, st_stop, line_times, sd, seed):
x, y = cuda.grid(2)
st_idx = st_start + x
op_idx = op_start + y
if st_idx >= st_stop: return
if op_idx >= op_stop: return
lut = ops[op_idx, 0]
z_idx = ops[op_idx, 1]
a_idx = ops[op_idx, 2]
b_idx = ops[op_idx, 3]
overflows = int(0)
_seed = (seed << 4) + (z_idx << 20) + (st_idx << 1)
a_mem = sat[a_idx, 0]
b_mem = sat[b_idx, 0]
z_mem, z_cap, _ = sat[z_idx]
a_cur = int(0)
b_cur = int(0)
z_cur = lut & 1
if z_cur == 1:
state[z_mem, st_idx] = TMIN
a = state[a_mem, st_idx] + line_times[a_idx, 0, z_cur] * rand_gauss_dev(_seed ^ a_mem ^ z_cur, sd)
b = state[b_mem, st_idx] + line_times[b_idx, 0, z_cur] * rand_gauss_dev(_seed ^ b_mem ^ z_cur, sd)
previous_t = TMIN
current_t = min(a, b)
inputs = int(0)
while current_t < TMAX:
z_val = z_cur & 1
if b < a:
b_cur += 1
b = state[b_mem + b_cur, st_idx]
b += line_times[b_idx, 0, z_val ^ 1] * rand_gauss_dev(_seed ^ b_mem ^ z_val ^ 1, sd)
thresh = line_times[b_idx, 1, z_val] * rand_gauss_dev(_seed ^ b_mem ^ z_val, sd)
inputs ^= 2
next_t = b
else:
a_cur += 1
a = state[a_mem + a_cur, st_idx]
a += line_times[a_idx, 0, z_val ^ 1] * rand_gauss_dev(_seed ^ a_mem ^ z_val ^ 1, sd)
thresh = line_times[a_idx, 1, z_val] * rand_gauss_dev(_seed ^ a_mem ^ z_val, sd)
inputs ^= 1
next_t = a
if (z_cur & 1) != ((lut >> inputs) & 1):
# we generate a toggle in z_mem, if:
# ( it is the first toggle in z_mem OR
# following toggle is earlier OR
# pulse is wide enough ) AND enough space in z_mem.
if z_cur == 0 or next_t < current_t or (current_t - previous_t) > thresh:
if z_cur < (z_cap - 1):
state[z_mem + z_cur, st_idx] = current_t
previous_t = current_t
z_cur += 1
else:
overflows += 1
previous_t = state[z_mem + z_cur - 1, st_idx]
z_cur -= 1
else:
z_cur -= 1
if z_cur > 0:
previous_t = state[z_mem + z_cur - 1, st_idx]
else:
previous_t = TMIN
current_t = min(a, b)
if overflows > 0:
state[z_mem + z_cur, st_idx] = TMAX_OVL
else:
state[z_mem + z_cur, st_idx] = a if a > b else b # propagate overflow flags by storing biggest TMAX from input

317
src/kyupy/wave_sim_cuda.py

@ -1,317 +0,0 @@
import numpy as np
import math
from .wave_sim import WaveSim
from . import cuda
TMAX = np.float32(2 ** 127) # almost np.PINF for 32-bit floating point values
TMAX_OVL = np.float32(1.1 * 2 ** 127) # almost np.PINF with overflow mark
TMIN = np.float32(-2 ** 127) # almost np.NINF for 32-bit floating point values
class WaveSimCuda(WaveSim):
def __init__(self, circuit, timing, sims=8, wavecaps=16, strip_forks=False, keep_waveforms=True):
super().__init__(circuit, timing, sims, wavecaps, strip_forks, keep_waveforms)
self.tdata = np.zeros((len(self.interface), 3, (sims - 1) // 8 + 1), dtype='uint8')
self.d_state = cuda.to_device(self.state)
self.d_sat = cuda.to_device(self.sat)
self.d_ops = cuda.to_device(self.ops)
self.d_timing = cuda.to_device(self.timing)
self.d_tdata = cuda.to_device(self.tdata)
self.d_cdata = cuda.to_device(self.cdata)
self._block_dim = (32, 16)
def get_line_delay(self, line, polarity):
return self.d_timing[line, 0, polarity]
def set_line_delay(self, line, polarity, delay):
self.d_timing[line, 0, polarity] = delay
def assign(self, vectors, time=0.0, offset=0):
assert (offset % 8) == 0
byte_offset = offset // 8
assert byte_offset < vectors.bits.shape[-1]
pdim = min(vectors.bits.shape[-1] - byte_offset, self.tdata.shape[-1])
self.tdata[..., 0:pdim] = vectors.bits[..., byte_offset:pdim + byte_offset]
if vectors.vdim == 1:
self.tdata[:, 1, 0:pdim] = ~self.tdata[:, 1, 0:pdim]
self.tdata[:, 2, 0:pdim] = 0
cuda.to_device(self.tdata, to=self.d_tdata)
grid_dim = self._grid_dim(self.sims, len(self.interface))
assign_kernel[grid_dim, self._block_dim](self.d_state, self.d_sat, self.ppi_offset,
len(self.interface), self.d_tdata, time)
def _grid_dim(self, x, y):
gx = math.ceil(x / self._block_dim[0])
gy = math.ceil(y / self._block_dim[1])
return gx, gy
def propagate(self, sims=None, sd=0.0, seed=1):
if sims is None:
sims = self.sims
else:
sims = min(sims, self.sims)
for op_start, op_stop in zip(self.level_starts, self.level_stops):
grid_dim = self._grid_dim(sims, op_stop - op_start)
wave_kernel[grid_dim, self._block_dim](self.d_ops, op_start, op_stop, self.d_state, self.sat, int(0),
sims, self.d_timing, sd, seed)
cuda.synchronize()
self.lst_eat_valid = False
def wave(self, line, vector):
if line < 0:
return None
mem, wcap, _ = self.sat[line]
if mem < 0:
return None
return self.d_state[mem:mem + wcap, vector]
def capture(self, time=TMAX, sd=0, seed=1, cdata=None, offset=0):
grid_dim = self._grid_dim(self.sims, len(self.interface))
capture_kernel[grid_dim, self._block_dim](self.d_state, self.d_sat, self.ppo_offset,
self.d_cdata, time, sd * math.sqrt(2), seed)
self.cdata[...] = self.d_cdata
if cdata is not None:
assert offset < cdata.shape[1]
cap_dim = min(cdata.shape[1] - offset, self.sims)
cdata[:, offset:cap_dim + offset] = self.cdata[:, 0:cap_dim]
self.lst_eat_valid = True
return self.cdata
def reassign(self, time=0.0):
grid_dim = self._grid_dim(self.sims, len(self.interface))
reassign_kernel[grid_dim, self._block_dim](self.d_state, self.d_sat, self.ppi_offset, self.ppo_offset,
self.d_cdata, time)
cuda.synchronize()
def wavecaps(self):
gx = math.ceil(len(self.circuit.lines) / 512)
wavecaps_kernel[gx, 512](self.d_state, self.d_sat, self.sims)
self.sat[...] = self.d_sat
return self.sat[..., 2]
@cuda.jit()
def wavecaps_kernel(state, sat, sims):
idx = cuda.grid(1)
if idx >= len(sat): return
lidx, lcap, _ = sat[idx]
if lidx < 0: return
wcap = 0
for sidx in range(sims):
for tidx in range(lcap):
t = state[lidx + tidx, sidx]
if tidx > wcap:
wcap = tidx
if t >= TMAX: break
sat[idx, 2] = wcap + 1
@cuda.jit()
def reassign_kernel(state, sat, ppi_offset, ppo_offset, cdata, ppi_time):
vector, y = cuda.grid(2)
if vector >= state.shape[-1]: return
if ppo_offset + y >= len(sat): return
ppo, ppo_cap, _ = sat[ppo_offset + y]
ppi, ppi_cap, _ = sat[ppi_offset + y]
if ppo < 0: return
if ppi < 0: return
ppo_val = int(cdata[y, vector, 1])
ppi_val = int(0)
for tidx in range(ppi_cap):
t = state[ppi + tidx, vector]
if t >= TMAX: break
ppi_val ^= 1
# make new waveform at PPI
toggle = 0
if ppi_val:
state[ppi + toggle, vector] = TMIN
toggle += 1
if ppi_val != ppo_val:
state[ppi + toggle, vector] = ppi_time
toggle += 1
state[ppi + toggle, vector] = TMAX
@cuda.jit()
def capture_kernel(state, sat, ppo_offset, cdata, time, s_sqrt2, seed):
x, y = cuda.grid(2)
if ppo_offset + y >= len(sat): return
line, tdim, _ = sat[ppo_offset + y]
if line < 0: return
if x >= state.shape[-1]: return
vector = x
m = 0.5
acc = 0.0
eat = TMAX
lst = TMIN
tog = 0
ovl = 0
val = int(0)
final = int(0)
for tidx in range(tdim):
t = state[line + tidx, vector]
if t >= TMAX:
if t == TMAX_OVL:
ovl = 1
break
m = -m
final ^= 1
if t < time:
val ^= 1
if t <= TMIN: continue
if s_sqrt2 > 0:
acc += m * (1 + math.erf((t - time) / s_sqrt2))
eat = min(eat, t)
lst = max(lst, t)
tog += 1
if s_sqrt2 > 0:
if m < 0:
acc += 1
if acc >= 0.99:
val = 1
elif acc > 0.01:
seed = (seed << 4) + (vector << 20) + (y << 1)
seed = int(0xDEECE66D) * seed + 0xB
seed = int(0xDEECE66D) * seed + 0xB
rnd = float((seed >> 8) & 0xffffff) / float(1 << 24)
val = rnd < acc
else:
val = 0
else:
acc = val
cdata[y, vector, 0] = acc
cdata[y, vector, 1] = val
cdata[y, vector, 2] = final
cdata[y, vector, 3] = (val != final)
cdata[y, vector, 4] = eat
cdata[y, vector, 5] = lst
cdata[y, vector, 6] = ovl
@cuda.jit()
def assign_kernel(state, sat, ppi_offset, intf_len, tdata, time):
x, y = cuda.grid(2)
if y >= intf_len: return
line = sat[ppi_offset + y, 0]
if line < 0: return
sdim = state.shape[-1]
if x >= sdim: return
vector = x
a0 = tdata[y, 0, vector // 8]
a1 = tdata[y, 1, vector // 8]
a2 = tdata[y, 2, vector // 8]
m = np.uint8(1 << (7 - (vector % 8)))
toggle = 0
if a0 & m:
state[line + toggle, x] = TMIN
toggle += 1
if (a2 & m) and ((a0 & m) == (a1 & m)):
state[line + toggle, x] = time
toggle += 1
state[line + toggle, x] = TMAX
@cuda.jit(device=True)
def rand_gauss(seed, sd):
clamp = 0.5
if sd <= 0.0:
return 1.0
while True:
x = -6.0
for i in range(12):
seed = int(0xDEECE66D) * seed + 0xB
x += float((seed >> 8) & 0xffffff) / float(1 << 24)
x *= sd
if abs(x) <= clamp:
break
return x + 1.0
@cuda.jit()
def wave_kernel(ops, op_start, op_stop, state, sat, st_start, st_stop, line_times, sd, seed):
x, y = cuda.grid(2)
st_idx = st_start + x
op_idx = op_start + y
if st_idx >= st_stop: return
if op_idx >= op_stop: return
lut = ops[op_idx, 0]
z_idx = ops[op_idx, 1]
a_idx = ops[op_idx, 2]
b_idx = ops[op_idx, 3]
overflows = int(0)
_seed = (seed << 4) + (z_idx << 20) + (st_idx << 1)
a_mem = sat[a_idx, 0]
b_mem = sat[b_idx, 0]
z_mem, z_cap, _ = sat[z_idx]
a_cur = int(0)
b_cur = int(0)
z_cur = lut & 1
if z_cur == 1:
state[z_mem, st_idx] = TMIN
a = state[a_mem, st_idx] + line_times[a_idx, 0, z_cur] * rand_gauss(_seed ^ a_mem ^ z_cur, sd)
b = state[b_mem, st_idx] + line_times[b_idx, 0, z_cur] * rand_gauss(_seed ^ b_mem ^ z_cur, sd)
previous_t = TMIN
current_t = min(a, b)
inputs = int(0)
while current_t < TMAX:
z_val = z_cur & 1
if b < a:
b_cur += 1
b = state[b_mem + b_cur, st_idx]
b += line_times[b_idx, 0, z_val ^ 1] * rand_gauss(_seed ^ b_mem ^ z_val ^ 1, sd)
thresh = line_times[b_idx, 1, z_val] * rand_gauss(_seed ^ b_mem ^ z_val, sd)
inputs ^= 2
next_t = b
else:
a_cur += 1
a = state[a_mem + a_cur, st_idx]
a += line_times[a_idx, 0, z_val ^ 1] * rand_gauss(_seed ^ a_mem ^ z_val ^ 1, sd)
thresh = line_times[a_idx, 1, z_val] * rand_gauss(_seed ^ a_mem ^ z_val, sd)
inputs ^= 1
next_t = a
if (z_cur & 1) != ((lut >> inputs) & 1):
# we generate a toggle in z_mem, if:
# ( it is the first toggle in z_mem OR
# following toggle is earlier OR
# pulse is wide enough ) AND enough space in z_mem.
if z_cur == 0 or next_t < current_t or (current_t - previous_t) > thresh:
if z_cur < (z_cap - 1):
state[z_mem + z_cur, st_idx] = current_t
previous_t = current_t
z_cur += 1
else:
overflows += 1
previous_t = state[z_mem + z_cur - 1, st_idx]
z_cur -= 1
else:
z_cur -= 1
if z_cur > 0:
previous_t = state[z_mem + z_cur - 1, st_idx]
else:
previous_t = TMIN
current_t = min(a, b)
if overflows > 0:
state[z_mem + z_cur, st_idx] = TMAX_OVL
else:
state[z_mem + z_cur, st_idx] = a if a > b else b # propagate overflow flags by storing biggest TMAX from input

2
tests/test_bench.py

@ -5,7 +5,7 @@ def test_b01(mydir):
with open(mydir / 'b01.bench', 'r') as f: with open(mydir / 'b01.bench', 'r') as f:
c = bench.parse(f.read()) c = bench.parse(f.read())
assert 92 == len(c.nodes) assert 92 == len(c.nodes)
c = bench.parse(mydir / 'b01.bench') c = bench.load(mydir / 'b01.bench')
assert 92 == len(c.nodes) assert 92 == len(c.nodes)

43
tests/test_logic.py

@ -23,6 +23,14 @@ def test_mvarray():
assert ary.length == 1 assert ary.length == 1
assert ary.width == 4 assert ary.width == 4
ary = lg.MVArray("1")
assert ary.length == 1
assert ary.width == 1
ary = lg.MVArray(["1"])
assert ary.length == 1
assert ary.width == 1
# instantiation with multiple vectors # instantiation with multiple vectors
ary = lg.MVArray([[0, 0], [0, 1], [1, 0], [1, 1]]) ary = lg.MVArray([[0, 0], [0, 1], [1, 0], [1, 1]])
@ -53,14 +61,14 @@ def test_mvarray():
# casting to 4-valued logic # casting to 4-valued logic
ary = lg.MVArray([0, 1, 2, None, 'F']) ary = lg.MVArray([0, 1, 2, None, 'F'], m=4)
assert ary.data[0] == lg.ZERO assert ary.data[0] == lg.ZERO
assert ary.data[1] == lg.ONE assert ary.data[1] == lg.ONE
assert ary.data[2] == lg.UNKNOWN assert ary.data[2] == lg.UNKNOWN
assert ary.data[3] == lg.UNASSIGNED assert ary.data[3] == lg.UNASSIGNED
assert ary.data[4] == lg.ZERO assert ary.data[4] == lg.ZERO
ary = lg.MVArray("0-X1PRFN") ary = lg.MVArray("0-X1PRFN", m=4)
assert ary.data[0] == lg.ZERO assert ary.data[0] == lg.ZERO
assert ary.data[1] == lg.UNASSIGNED assert ary.data[1] == lg.UNASSIGNED
assert ary.data[2] == lg.UNKNOWN assert ary.data[2] == lg.UNKNOWN
@ -77,7 +85,7 @@ def test_mvarray():
assert ary.data[1] == lg.ONE assert ary.data[1] == lg.ONE
assert ary.data[2] == lg.UNKNOWN assert ary.data[2] == lg.UNKNOWN
assert ary.data[3] == lg.UNASSIGNED assert ary.data[3] == lg.UNASSIGNED
assert ary.data[4] == lg.FALLING assert ary.data[4] == lg.FALL
ary = lg.MVArray("0-X1PRFN", m=8) ary = lg.MVArray("0-X1PRFN", m=8)
assert ary.data[0] == lg.ZERO assert ary.data[0] == lg.ZERO
@ -85,8 +93,8 @@ def test_mvarray():
assert ary.data[2] == lg.UNKNOWN assert ary.data[2] == lg.UNKNOWN
assert ary.data[3] == lg.ONE assert ary.data[3] == lg.ONE
assert ary.data[4] == lg.PPULSE assert ary.data[4] == lg.PPULSE
assert ary.data[5] == lg.RISING assert ary.data[5] == lg.RISE
assert ary.data[6] == lg.FALLING assert ary.data[6] == lg.FALL
assert ary.data[7] == lg.NPULSE assert ary.data[7] == lg.NPULSE
# copy constructor and casting # copy constructor and casting
@ -103,3 +111,28 @@ def test_mvarray():
ary2 = lg.MVArray(ary, m=2) ary2 = lg.MVArray(ary, m=2)
assert ary2.data[1] == lg.ZERO assert ary2.data[1] == lg.ZERO
assert ary2.data[7] == lg.ONE assert ary2.data[7] == lg.ONE
def test_mv_operations():
x1_2v = lg.MVArray("0011", m=2)
x2_2v = lg.MVArray("0101", m=2)
x1_4v = lg.MVArray("0000XXXX----1111", m=4)
x2_4v = lg.MVArray("0X-10X-10X-10X-1", m=4)
x1_8v = lg.MVArray("00000000XXXXXXXX--------11111111PPPPPPPPRRRRRRRRFFFFFFFFNNNNNNNN", m=8)
x2_8v = lg.MVArray("0X-1PRFN0X-1PRFN0X-1PRFN0X-1PRFN0X-1PRFN0X-1PRFN0X-1PRFN0X-1PRFN", m=8)
assert str(lg.mv_not(x1_2v)) == "['1100']"
assert str(lg.mv_not(x1_4v)) == "['1111XXXXXXXX0000']"
assert str(lg.mv_not(x1_8v)) == "['11111111XXXXXXXXXXXXXXXX00000000NNNNNNNNFFFFFFFFRRRRRRRRPPPPPPPP']"
assert str(lg.mv_or(x1_2v, x2_2v)) == "['0111']"
assert str(lg.mv_or(x1_4v, x2_4v)) == "['0XX1XXX1XXX11111']"
assert str(lg.mv_or(x1_8v, x2_8v)) == "['0XX1PRFNXXX1XXXXXXX1XXXX11111111PXX1PRFNRXX1RRNNFXX1FNFNNXX1NNNN']"
assert str(lg.mv_and(x1_2v, x2_2v)) == "['0001']"
assert str(lg.mv_and(x1_4v, x2_4v)) == "['00000XXX0XXX0XX1']"
assert str(lg.mv_and(x1_8v, x2_8v)) == "['000000000XXXXXXX0XXXXXXX0XX1PRFN0XXPPPPP0XXRPRPR0XXFPPFF0XXNPRFN']"
assert str(lg.mv_xor(x1_2v, x2_2v)) == "['0110']"
assert str(lg.mv_xor(x1_4v, x2_4v)) == "['0XX1XXXXXXXX1XX0']"
assert str(lg.mv_xor(x1_8v, x2_8v)) == "['0XX1PRFNXXXXXXXXXXXXXXXX1XX0NFRPPXXNPRFNRXXFRPNFFXXRFNPRNXXPNFRP']"

2
tests/test_logic_sim.py

@ -141,7 +141,7 @@ def test_vd3():
def test_b01(mydir): def test_b01(mydir):
c = bench.parse(mydir / 'b01.bench') c = bench.load(mydir / 'b01.bench')
# 2-valued # 2-valued
s = LogicSim(c, 8) s = LogicSim(c, 8)

6
tests/test_sdf.py

@ -74,13 +74,13 @@ def test_parse():
def test_b14(mydir): def test_b14(mydir):
df = sdf.parse(mydir / 'b14.sdf.gz') df = sdf.load(mydir / 'b14.sdf.gz')
assert df.name == 'b14' assert df.name == 'b14'
def test_gates(mydir): def test_gates(mydir):
c = verilog.parse(mydir / 'gates.v') c = verilog.load(mydir / 'gates.v')
df = sdf.parse(mydir / 'gates.sdf') df = sdf.load(mydir / 'gates.sdf')
lt = df.annotation(c, pin_index, dataset=1) lt = df.annotation(c, pin_index, dataset=1)
nand_a = c.cells['nandgate'].ins[0] nand_a = c.cells['nandgate'].ins[0]
nand_b = c.cells['nandgate'].ins[1] nand_b = c.cells['nandgate'].ins[1]

2
tests/test_stil.py

@ -2,7 +2,7 @@ from kyupy import stil
def test_b14(mydir): def test_b14(mydir):
s = stil.parse(mydir / 'b14.stuck.stil.gz') s = stil.load(mydir / 'b14.stuck.stil.gz')
assert 10 == len(s.signal_groups) assert 10 == len(s.signal_groups)
assert 1 == len(s.scan_chains) assert 1 == len(s.scan_chains)
assert 2163 == len(s.calls) assert 2163 == len(s.calls)

3
tests/test_verilog.py

@ -5,5 +5,4 @@ def test_b01(mydir):
with open(mydir / 'b01.v', 'r') as f: with open(mydir / 'b01.v', 'r') as f:
modules = verilog.parse(f.read()) modules = verilog.parse(f.read())
assert modules is not None assert modules is not None
assert verilog.parse(mydir / 'b01.v') is not None assert verilog.load(mydir / 'b01.v') is not None

15
tests/test_wave_sim.py

@ -1,11 +1,10 @@
import numpy as np import numpy as np
from kyupy.wave_sim import WaveSim, wave_eval, TMIN, TMAX from kyupy.wave_sim import WaveSim, WaveSimCuda, wave_eval, TMIN, TMAX
from kyupy.logic_sim import LogicSim from kyupy.logic_sim import LogicSim
from kyupy import verilog from kyupy import verilog
from kyupy import sdf from kyupy import sdf
from kyupy.saed import pin_index from kyupy.saed import pin_index
from kyupy.packed_vectors import PackedVectors from kyupy.packed_vectors import PackedVectors
from kyupy.wave_sim_cuda import WaveSimCuda
def test_wave_eval(): def test_wave_eval():
@ -122,24 +121,24 @@ def compare_to_logic_sim(wsim):
def test_b14(mydir): def test_b14(mydir):
c = verilog.parse(mydir / 'b14.v.gz', branchforks=True) c = verilog.load(mydir / 'b14.v.gz', branchforks=True)
df = sdf.parse(mydir / 'b14.sdf.gz') df = sdf.load(mydir / 'b14.sdf.gz')
lt = df.annotation(c, pin_index) lt = df.annotation(c, pin_index)
wsim = WaveSim(c, lt, 8) wsim = WaveSim(c, lt, 8)
compare_to_logic_sim(wsim) compare_to_logic_sim(wsim)
def test_b14_strip_forks(mydir): def test_b14_strip_forks(mydir):
c = verilog.parse(mydir / 'b14.v.gz', branchforks=True) c = verilog.load(mydir / 'b14.v.gz', branchforks=True)
df = sdf.parse(mydir / 'b14.sdf.gz') df = sdf.load(mydir / 'b14.sdf.gz')
lt = df.annotation(c, pin_index) lt = df.annotation(c, pin_index)
wsim = WaveSim(c, lt, 8, strip_forks=True) wsim = WaveSim(c, lt, 8, strip_forks=True)
compare_to_logic_sim(wsim) compare_to_logic_sim(wsim)
def test_b14_cuda(mydir): def test_b14_cuda(mydir):
c = verilog.parse(mydir / 'b14.v.gz', branchforks=True) c = verilog.load(mydir / 'b14.v.gz', branchforks=True)
df = sdf.parse(mydir / 'b14.sdf.gz') df = sdf.load(mydir / 'b14.sdf.gz')
lt = df.annotation(c, pin_index) lt = df.annotation(c, pin_index)
wsim = WaveSimCuda(c, lt, 8) wsim = WaveSimCuda(c, lt, 8)
compare_to_logic_sim(wsim) compare_to_logic_sim(wsim)

Loading…
Cancel
Save