Browse Source

New m-valued logic arrays, documentation, 0.0.2

- MVArray for multi-valued logic
- BPArray for bit-parallel storage layout
- Started documenting with Sphinx
- Migrated simulators to new BPArray
main v0.0.2
Stefan Holst 4 years ago
  1. 5
  2. 205
  3. 56
  4. 28
  5. 20
  6. 64
  7. 29
  8. 12
  9. 42
  10. 20
  11. 7
  12. 28
  13. 37
  14. 23
  15. 215
  16. 402
  17. 284
  18. 299
  19. 64
  20. 151
  21. 37
  22. 337
  23. 317
  24. 2
  25. 45
  26. 214
  27. 201
  28. 88
  29. 6
  30. 2
  31. 3
  32. 46

.gitignore vendored

@ -3,3 +3,8 @@ @@ -3,3 +3,8 @@

UsageExamples.ipynb → Demo.ipynb

@ -22,8 +22,8 @@ @@ -22,8 +22,8 @@
"source": [
"from kyupy import bench\n",
"# parse a file\n",
"b01 = bench.parse('tests/b01.bench')\n",
"# load a file\n",
"b01 = bench.load('tests/b01.bench')\n",
"# ... or specify the circuit as string \n",
"mycircuit = bench.parse('input(a,b) output(o1,o2,o3) x=buf(a) o1=not(x) o2=buf(x) o3=buf(x)')"
@ -44,7 +44,7 @@ @@ -44,7 +44,7 @@
"data": {
"text/plain": [
"<Circuit 'tests/b01' with 92 nodes, 130 lines, 4 ports>"
"<Circuit 'tests/b01.bench' with 92 nodes, 130 lines, 4 ports>"
"execution_count": 2,
@ -373,7 +373,7 @@ @@ -373,7 +373,7 @@
"source": [
"from kyupy import verilog\n",
"b14 = verilog.parse('tests/b14.v.gz')\n",
"b14 = verilog.load('tests/b14.v.gz')\n",
@ -456,7 +456,7 @@ @@ -456,7 +456,7 @@
"source": [
"from kyupy import verilog\n",
"b14 = verilog.parse('tests/b14.v.gz')\n",
"b14 = verilog.load('tests/b14.v.gz')\n",
@ -567,11 +567,11 @@ @@ -567,11 +567,11 @@
"outputs": [],
"source": [
"from kyupy import verilog, stil\n",
"from kyupy.logic import MVArray, BPArray\n",
"from kyupy.logic_sim import LogicSim\n",
"from kyupy.packed_vectors import PackedVectors\n",
"b14 = verilog.parse('tests/b14.v.gz')\n",
"s = stil.parse('tests/b14.stuck.stil.gz')\n",
"b14 = verilog.load('tests/b14.v.gz')\n",
"s = stil.load('tests/b14.stuck.stil.gz')\n",
"stuck_tests = s.tests(b14)\n",
"stuck_responses = s.responses(b14)"
@ -580,7 +580,7 @@ @@ -580,7 +580,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"Tests and responses are instances of `PackedVectors`. Its length is the number of test vectors stored (`nvectors`), its `width` is the number of values in a vector, and its `vdim` is the number of bits used for storing one value. By default, the stil parser returns 4-valued test vectors (`vdim=2`)."
"Tests and responses are instances of `MVArray`. Its `length` is the number of test vectors stored, its `width` is the number of values in a vector. By default, the stil parser returns 8-valued test vectors (`m=8`)."
@ -591,7 +591,7 @@ @@ -591,7 +591,7 @@
"data": {
"text/plain": [
"<PackedVectors nvectors=1081, width=306, vdim=2>"
"<MVArray length=1081 width=306 m=8 nbytes=330786>"
"execution_count": 19,
@ -607,7 +607,7 @@ @@ -607,7 +607,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"The data is stored in a bit-parallel fashion. This internal storage (an `ndarray` of `uint8`) is accessible via `bits`. The first axis is the width, the second axis is `vdim`, the last axis goes along the test set. This last axis is about `nvectors / 8` in length. "
"The internal storage (an `ndarray` of `uint8`) is accessible via `data`. The first axis is the width, and the last axis goes along the test set."
@ -618,7 +618,7 @@ @@ -618,7 +618,7 @@
"data": {
"text/plain": [
"(306, 2, 136)"
"(306, 1081)"
"execution_count": 20,
@ -627,14 +627,14 @@ @@ -627,14 +627,14 @@
"source": [
"cell_type": "markdown",
"metadata": {},
"source": [
"The subscript accessor returns a string representation of the given test vector number. Possible values are '0', '1', '-', and 'X'."
"The subscript accessor returns a string representation of the given test vector number. Possible values are '0', '1', '-', 'X', 'R', 'F', 'P', and 'N'."
@ -645,7 +645,7 @@ @@ -645,7 +645,7 @@
"data": {
"text/plain": [
"execution_count": 21,
@ -681,25 +681,80 @@ @@ -681,25 +681,80 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"The order of values in the vectors correspond to the circuit's interface followed by the scan flip-flops as they appear in `b14.cells`. The test data can be used directly in the simulators as they use the same ordering convention. The following code performs a 4-valued logic simulation and stores the results in a new instance of `PackedVectors`."
"The order of values in the vectors correspond to the circuit's interface followed by the scan flip-flops as they appear in `b14.cells`.\n",
"The test data can be used directly in the simulators as they use the same ordering convention.\n",
"The logic simulator uses bit-parallel storage of logic values, but our loaded test data uses one `uint8` per logic value.\n",
"To convert the storage layout, we instanciate a `BPArray` for the input stimuli.\n",
"The storage layout is more compact, but individual values cannot be easily accessed anymore."
"cell_type": "code",
"execution_count": 23,
"metadata": {},
"outputs": [
"data": {
"text/plain": [
"<BPArray length=1081 width=306 m=8 bytes=124848>"
"execution_count": 23,
"metadata": {},
"output_type": "execute_result"
"source": [
"stuck_tests_bp = BPArray(stuck_tests)\n",
"cell_type": "code",
"execution_count": 24,
"metadata": {},
"outputs": [
"data": {
"text/plain": [
"(306, 3, 136)"
"execution_count": 24,
"metadata": {},
"output_type": "execute_result"
"source": [
"cell_type": "markdown",
"metadata": {},
"source": [
"The following code performs a 8-valued logic simulation and stores the results in a new instance of `BPArray`.\n",
"The packed array is unpacked into an `MVArray` for value access."
"cell_type": "code",
"execution_count": 25,
"metadata": {},
"outputs": [],
"source": [
"responses = PackedVectors(len(stuck_tests), stuck_tests.width, 2)\n",
"simulator = LogicSim(b14, len(responses), 2)\n",
"responses_bp = BPArray((stuck_tests_bp.width, len(stuck_tests_bp)))\n",
"simulator = LogicSim(b14, sims=len(stuck_tests_bp))\n",
"responses = MVArray(responses_bp)"
"cell_type": "code",
"execution_count": 24,
"execution_count": 26,
"metadata": {},
"outputs": [
@ -708,7 +763,7 @@ @@ -708,7 +763,7 @@
"execution_count": 24,
"execution_count": 26,
"metadata": {},
"output_type": "execute_result"
@ -726,7 +781,7 @@ @@ -726,7 +781,7 @@
"cell_type": "code",
"execution_count": 25,
"execution_count": 27,
"metadata": {},
"outputs": [
@ -752,39 +807,32 @@ @@ -752,39 +807,32 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"Transition faults require test vector pairs for testing. These pairs are generated by `tests8v`, assuming a launch-on-capture scheme (two functional clock cycles after scan-in)."
"Transition faults require test vector pairs for testing. These pairs are generated by `tests_loc`, assuming a launch-on-capture scheme (two functional clock cycles after scan-in)."
"cell_type": "code",
"execution_count": 26,
"execution_count": 28,
"metadata": {},
"outputs": [],
"source": [
"s = stil.parse('tests/b14.transition.stil.gz')\n",
"trans_tests = s.tests8v(b14)\n",
"s = stil.load('tests/b14.transition.stil.gz')\n",
"trans_tests = s.tests_loc(b14)\n",
"trans_responses = s.responses(b14)"
"cell_type": "markdown",
"metadata": {},
"source": [
"The returned test data is now 8-valued (`vdim=3`)"
"cell_type": "code",
"execution_count": 27,
"execution_count": 29,
"metadata": {},
"outputs": [
"data": {
"text/plain": [
"<PackedVectors nvectors=1392, width=306, vdim=3>"
"<MVArray length=1392 width=306 m=8 nbytes=425952>"
"execution_count": 27,
"execution_count": 29,
"metadata": {},
"output_type": "execute_result"
@ -802,16 +850,16 @@ @@ -802,16 +850,16 @@
"cell_type": "code",
"execution_count": 28,
"execution_count": 30,
"metadata": {},
"outputs": [
"data": {
"text/plain": [
"execution_count": 28,
"execution_count": 30,
"metadata": {},
"output_type": "execute_result"
@ -829,20 +877,22 @@ @@ -829,20 +877,22 @@
"cell_type": "code",
"execution_count": 29,
"execution_count": 31,
"metadata": {},
"outputs": [],
"source": [
"responses = PackedVectors(len(trans_tests), trans_tests.width, 3)\n",
"simulator = LogicSim(b14, len(responses), 3)\n",
"trans_tests_bp = BPArray(trans_tests)\n",
"responses_bp = BPArray((trans_tests_bp.width, len(trans_tests_bp)))\n",
"simulator = LogicSim(b14, sims=len(trans_tests_bp))\n",
"responses = MVArray(responses_bp)"
"cell_type": "code",
"execution_count": 30,
"execution_count": 32,
"metadata": {},
"outputs": [
@ -851,7 +901,7 @@ @@ -851,7 +901,7 @@
"execution_count": 30,
"execution_count": 32,
"metadata": {},
"output_type": "execute_result"
@ -869,7 +919,7 @@ @@ -869,7 +919,7 @@
"cell_type": "code",
"execution_count": 31,
"execution_count": 33,
"metadata": {},
"outputs": [
@ -907,14 +957,14 @@ @@ -907,14 +957,14 @@
"cell_type": "code",
"execution_count": 32,
"execution_count": 34,
"metadata": {},
"outputs": [],
"source": [
"from kyupy import sdf\n",
"from kyupy.saed import pin_index\n",
"df = sdf.parse('tests/b14.sdf.gz')\n",
"df = sdf.load('tests/b14.sdf.gz')\n",
"lt = df.annotation(b14, pin_index, dataset=0, interconnect=False)"
@ -927,7 +977,7 @@ @@ -927,7 +977,7 @@
"cell_type": "code",
"execution_count": 33,
"execution_count": 35,
"metadata": {},
"outputs": [
@ -936,7 +986,7 @@ @@ -936,7 +986,7 @@
"(46891, 2, 2)"
"execution_count": 33,
"execution_count": 35,
"metadata": {},
"output_type": "execute_result"
@ -954,7 +1004,7 @@ @@ -954,7 +1004,7 @@
"cell_type": "code",
"execution_count": 34,
"execution_count": 36,
"metadata": {},
"outputs": [
@ -963,7 +1013,7 @@ @@ -963,7 +1013,7 @@
"execution_count": 34,
"execution_count": 36,
"metadata": {},
"output_type": "execute_result"
@ -991,11 +1041,11 @@ @@ -991,11 +1041,11 @@
"cell_type": "code",
"execution_count": 35,
"execution_count": 37,
"metadata": {},
"outputs": [],
"source": [
"from kyupy.wave_sim_cuda import WaveSimCuda, TMAX\n",
"from kyupy.wave_sim import WaveSimCuda, TMAX\n",
"import numpy as np\n",
"wsim = WaveSimCuda(b14, lt, sims=32, wavecaps=16)"
@ -1010,7 +1060,7 @@ @@ -1010,7 +1060,7 @@
"cell_type": "code",
"execution_count": 36,
"execution_count": 38,
"metadata": {},
"outputs": [
@ -1043,23 +1093,23 @@ @@ -1043,23 +1093,23 @@
"metadata": {},
"source": [
"This is a typical simulation loop where the number of patterns is larger than the number of simulators available.\n",
"We simulate `trans_tests`.\n",
"The timing simulator accepts 4-valued and 8-valued `PackedVectors`, but it will return response (capture) data in a different format."
"We simulate `trans_tests_bp`.\n",
"The timing simulator accepts 8-valued `BPArray`s, but it will return response (capture) data in a different format."
"cell_type": "code",
"execution_count": 37,
"execution_count": 39,
"metadata": {},
"outputs": [],
"source": [
"nvectors = 128 # len(trans_tests) # Feel free to simulate all tests if CUDA is set up correctly.\n",
"sims = 128 # len(trans_tests_bp) # Feel free to simulate all tests if CUDA is set up correctly.\n",
"cdata = np.zeros((len(wsim.interface), nvectors, 7)) # space to store all capture data\n",
"cdata = np.zeros((len(wsim.interface), sims, 7)) # space to store all capture data\n",
"for offset in range(0, nvectors, wsim.sims):\n",
" wsim.assign(trans_tests, offset=offset)\n",
" wsim.propagate(sims=nvectors-offset)\n",
"for offset in range(0, sims, wsim.sims):\n",
" wsim.assign(trans_tests_bp, offset=offset)\n",
" wsim.propagate(sims=sims-offset)\n",
" wsim.capture(time=2.5, cdata=cdata, offset=offset) # capture at time 2.5"
@ -1079,7 +1129,7 @@ @@ -1079,7 +1129,7 @@
"cell_type": "code",
"execution_count": 38,
"execution_count": 40,
"metadata": {},
"outputs": [
@ -1088,7 +1138,7 @@ @@ -1088,7 +1138,7 @@
"(306, 128, 7)"
"execution_count": 38,
"execution_count": 40,
"metadata": {},
"output_type": "execute_result"
@ -1106,7 +1156,7 @@ @@ -1106,7 +1156,7 @@
"cell_type": "code",
"execution_count": 39,
"execution_count": 41,
"metadata": {},
"outputs": [
@ -1139,7 +1189,7 @@ @@ -1139,7 +1189,7 @@
"cell_type": "code",
"execution_count": 40,
"execution_count": 42,
"metadata": {},
"outputs": [
@ -1148,7 +1198,7 @@ @@ -1148,7 +1198,7 @@
"execution_count": 40,
"execution_count": 42,
"metadata": {},
"output_type": "execute_result"
@ -1166,7 +1216,7 @@ @@ -1166,7 +1216,7 @@
"cell_type": "code",
"execution_count": 41,
"execution_count": 43,
"metadata": {},
"outputs": [
@ -1175,7 +1225,7 @@ @@ -1175,7 +1225,7 @@
"execution_count": 41,
"execution_count": 43,
"metadata": {},
"output_type": "execute_result"
@ -1193,7 +1243,7 @@ @@ -1193,7 +1243,7 @@
"cell_type": "code",
"execution_count": 42,
"execution_count": 44,
"metadata": {},
"outputs": [
@ -1202,7 +1252,7 @@ @@ -1202,7 +1252,7 @@
"execution_count": 42,
"execution_count": 44,
"metadata": {},
"output_type": "execute_result"
@ -1222,15 +1272,14 @@ @@ -1222,15 +1272,14 @@
"If there is an error related to `nvvm`, you probably need to set up some environment variables:\n",
"%env LD_LIBRARY_PATH=/usr/local/cuda/lib64\n",
"%env NUMBAPRO_NVVM=/usr/local/cuda/nvvm/lib64/\n",
"%env NUMBAPRO_LIBDEVICE=/usr/local/cuda/nvvm/libdevice\n",
"%env CUDA_HOME=/usr/local/cuda\n",
"If problems persist, refer to documentations for numba and cuda. "
"cell_type": "code",
"execution_count": 43,
"execution_count": 45,
"metadata": {},
"outputs": [
@ -1252,7 +1301,7 @@ @@ -1252,7 +1301,7 @@
"execution_count": 43,
"execution_count": 45,
"metadata": {},
"output_type": "execute_result"


@ -1,56 +0,0 @@ @@ -1,56 +0,0 @@
KyuPy - Processing VLSI Circuits With Ease
KyuPy is a python package for high-performance processing and analysis of
non-hierarchical VLSI designs. Its purpose is to provide a rapid prototyping
platform to aid and accelerate research in the fields of VLSI test, diagnosis
and reliability. KyuPy is freely available under the MIT license.
Main Features
* Partial [lark]( parsers for common files used with synthesized designs:
bench, gate-level verilog, standard delay format (SDF), standard test interface language (STIL)
* Bit-parallel gate-level 2-, 4-, and 8-valued logic simulation
* GPU-accelerated high-throughput gate-level timing simulation
* High-performance through the use of [numpy]( and [numba](
Getting Started
KyuPy requires Python 3.6 or newer.
Install the latest release by running:
pip3 install --user kyupy
For best performance, ensure you have [numba]( installed:
pip3 install --user numba
GPU/CUDA support may [require some additional setup](
If CUDA or numba is not available, KyuPy will automatically fall back to slow, pure python execution.
The Jupyter Notebook [UsageExamples.ipynb]( on GitHub
contains some useful examples to get familiar with the API.
To contribute to KyuPy or simply explore the source code, clone the KyuPy [repository]( on GitHub.
Within your local checkout, run:
pip3 install --user -e .
to make the kyupy package available in your python environment.
The source code comes with tests that can be run with:
KyuPy depends on the following packages:
* [lark-parser](
* [numpy](
* [numba]( (optional, required only for GPU/CUDA support)


@ -0,0 +1,28 @@ @@ -0,0 +1,28 @@
KyuPy - Pythonic Processing of VLSI Circuits
KyuPy is a Python package for processing and analysis of non-hierarchical gate-level VLSI designs.
It contains fundamental building blocks for research software in the fields of VLSI test, diagnosis and reliability:
* Efficient data structures for gate-level circuits and related design data.
* Partial `lark <>`_ parsers for common design files like
bench, gate-level verilog, standard delay format (SDF), standard test interface language (STIL).
* Bit-parallel gate-level 2-, 4-, and 8-valued logic simulation.
* GPU-accelerated high-throughput gate-level timing simulation.
* High-performance through the use of `numpy <>`_ and `numba <>`_.
Getting Started
KyuPy is available in `PyPI <>`_.
It requires Python 3.6 or newer, `lark-parser <>`_, and `numpy`_.
Although optional, `numba`_ should be installed for best performance.
GPU/CUDA support in numba may `require some additional setup <>`_.
If numba is not available, KyuPy will automatically fall back to slow, pure Python execution.
The Jupyter Notebook `Demo.ipynb <>`_ contains some useful examples to get familiar with the API.
To work with the latest pre-release source code, clone the `KyuPy GitHub repository <>`_.
Run ``pip3 install --user -e .`` within your local checkout to make the package available in your Python environment.
The source code comes with tests that can be run with ``pytest``.


@ -0,0 +1,20 @@ @@ -0,0 +1,20 @@
# Minimal makefile for Sphinx documentation
# You can set these variables from the command line, and also
# from the environment for the first two.
SPHINXBUILD ?= sphinx-build
BUILDDIR = _build
# Put it first so that "make" without argument is like "make help".
.PHONY: help Makefile
# Catch-all target: route all unknown targets to Sphinx using the new
# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
%: Makefile


@ -0,0 +1,64 @@ @@ -0,0 +1,64 @@
# Configuration file for the Sphinx documentation builder.
# This file only contains a selection of the most common options. For a full
# list see the documentation:
# -- Path setup --------------------------------------------------------------
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
import os
import sys
import sphinx_rtd_theme
sys.path.insert(0, os.path.abspath('../src'))
# -- Project information -----------------------------------------------------
project = 'KyuPy'
copyright = '2020, Stefan Holst'
author = 'Stefan Holst'
# The full version, including alpha/beta/rc tags
release = '0.0.2'
# -- General configuration ---------------------------------------------------
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = [
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
# This pattern also affects html_static_path and html_extra_path.
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
# -- Options for HTML output -------------------------------------------------
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
# html_theme = 'alabaster'
html_theme = 'sphinx_rtd_theme'
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']
autodoc_default_options = {
'member-order': 'bysource',


@ -0,0 +1,29 @@ @@ -0,0 +1,29 @@
Data Structures
KyuPy provides two types of core data structures, one for gate-level circuits, and a few others for representing and storing logic data and signal values.
The data structures are designed to work together nicely with numpy arrays.
For example, all the nodes and connections in the circuit graph have consecutive integer indices that can be used to access ndarrays with associated data.
Circuit graphs also define an ordering of inputs, outputs and other nodes to easily process test vector data and alike.
Circuit Graph - :mod:`kyupy.circuit`
.. automodule:: kyupy.circuit
.. autoclass:: kyupy.circuit.Node
.. autoclass:: kyupy.circuit.Line
.. autoclass:: kyupy.circuit.Circuit
Multi-Valued Logic - :mod:`kyupy.logic`
.. automodule:: kyupy.logic


@ -0,0 +1,12 @@ @@ -0,0 +1,12 @@
.. include:: ../README.rst
API Reference
.. toctree::
:maxdepth: 2


@ -0,0 +1,42 @@ @@ -0,0 +1,42 @@
KyuPy contains simple (and often incomplete) parsers for common file formats.
These parsers are tailored to the most common use-cases to keep the grammars and the code-base as simple as possible.
Each of the modules export a function ``parse()`` for parsing a string directly and a function
``load()`` for loading a file. Files with a '.gz' extension are uncompressed on-the-fly.
Verilog - :mod:`kyupy.verilog`
.. automodule:: kyupy.verilog
:members: parse, load
Bench Format - :mod:`kyupy.bench`
.. automodule:: kyupy.bench
:members: parse, load
Standard Test Interface Language - :mod:`kyupy.stil`
.. automodule:: kyupy.stil
:members: parse, load
.. autoclass:: kyupy.stil.StilFile
Standard Delay Format - :mod:`kyupy.sdf`
.. automodule:: kyupy.sdf
:members: parse, load
.. autoclass:: kyupy.sdf.DelayFile


@ -0,0 +1,20 @@ @@ -0,0 +1,20 @@
Logic Simulation - :mod:`kyupy.logic_sim`
.. autoclass:: kyupy.logic_sim.LogicSim
Timing Simulation - :mod:`kyupy.wave_sim`
.. automodule:: kyupy.wave_sim
.. autoclass:: kyupy.wave_sim.WaveSim
.. autoclass:: kyupy.wave_sim.WaveSimCuda


@ -1,14 +1,13 @@ @@ -1,14 +1,13 @@
from setuptools import setup, find_packages
with open('', 'r') as f:
with open('README.rst', 'r') as f:
long_description =
description='High-performance processing and analysis of non-hierarchical VLSI designs',
package_dir={'': 'src'},
@ -16,7 +15,7 @@ setup( @@ -16,7 +15,7 @@ setup(


@ -1,10 +1,13 @@ @@ -1,10 +1,13 @@
"""This package provides tools for high-performance processing and validation
of non-hierarchical VLSI circuits to aid rapid prototyping of research code
in the fields of VLSI test, diagnosis and reliability.
"""A package for processing and analysis of non-hierarchical gate-level VLSI designs.
It contains fundamental building blocks for research software in the fields of VLSI test, diagnosis and reliability.
import time
import importlib.util
import gzip
import numpy as np
class Log:
@ -97,12 +100,27 @@ if importlib.util.find_spec('numba') is not None: @@ -97,12 +100,27 @@ if importlib.util.find_spec('numba') is not None:
from numba import cuda
except CudaSupportError:
log.warn('Cuda unavailable. Falling back to pure python')
log.warn('Cuda unavailable. Falling back to pure Python.')
cuda = MockCuda()
numba = MockNumba()
cuda = MockCuda()
log.warn('Numba unavailable. Falling back to pure python')
log.warn('Numba unavailable. Falling back to pure Python.')
_pop_count_lut = np.asarray([bin(x).count('1') for x in range(256)])
def popcount(a):
return np.sum(_pop_count_lut[a])
def readtext(file):
if hasattr(file, 'read'):
if str(file).endswith('.gz'):
with, 'rt') as f:
with open(file, 'rt') as f:


@ -1,5 +1,16 @@ @@ -1,5 +1,16 @@
"""A parser for the ISCAS89 benchmark format.
The ISCAS89 benchmark format (`.bench`-suffix) is a very simple textual description of gate-level netlists.
Historically it was first used in the
`ISCAS89 benchmark set <>`_.
Besides loading these benchmarks, this module is also useful for easily constructing simple circuits:
``c = bench.parse('input(x, y) output(a, o, n) a=and(x,y) o=or(x,y) n=not(x)')``.
from lark import Lark, Transformer
from .circuit import Circuit, Node, Line
from . import readtext
class BenchTransformer(Transformer):
@ -21,7 +32,6 @@ class BenchTransformer(Transformer): @@ -21,7 +32,6 @@ class BenchTransformer(Transformer):
[Line(self.c, d, cell) for d in drivers]
def parse(bench):
grammar = r"""
start: (statement)*
statement: input | output | assignment
@ -32,12 +42,23 @@ def parse(bench): @@ -32,12 +42,23 @@ def parse(bench):
NAME: /[-_a-z0-9]+/i
%ignore ( /\r?\n/ | "#" /[^\n]*/ | /[\t\f ]/ )+
name = None
if '(' not in str(bench): # No parentheses?: Assuming it is a file name.
name = str(bench).replace('.bench', '')
with open(bench, 'r') as f:
text =
text = bench
def parse(text, name=None):
"""Parses the given ``text`` as ISCAS89 bench code.
:param text: A string with bench code.
:param name: The name of the circuit. Circuit names are not included in bench descriptions.
:return: A :class:`Circuit` object.
return Lark(grammar, parser="lalr", transformer=BenchTransformer(name)).parse(text)
def load(file, name=None):
"""Parses the contents of ``file`` as ISCAS89 bench code.
:param file: The file to be loaded.
:param name: The name of the circuit. If none given, the file name is used as circuit name.
:return: A :class:`Circuit` object.
return parse(readtext(file), name=name or str(file))


@ -1,23 +0,0 @@ @@ -1,23 +0,0 @@
import numpy as np
import importlib.util
if importlib.util.find_spec('numba') is not None:
import numba
from . import numba
print('Numba unavailable. Falling back to pure python')
_pop_count_lut = np.asarray([bin(x).count('1') for x in range(256)])
def popcount(a):
return np.sum(_pop_count_lut[a])
_bit_in_lut = np.array([2 ** x for x in range(7, -1, -1)], dtype='uint8')
def bit_in(a, pos):
return a[pos >> 3] & _bit_in_lut[pos & 7]


@ -1,3 +1,10 @@ @@ -1,3 +1,10 @@
"""Data structures for representing non-hierarchical gate-level circuits.
The class :class:`Circuit` is a container of nodes connected by lines.
A node is an instance of class :class:`Node`,
and a line is an instance of class :class:`Line`.
from collections import deque
@ -7,6 +14,9 @@ class GrowingList(list): @@ -7,6 +14,9 @@ class GrowingList(list):
self.extend([None] * (index + 1 - len(self)))
super().__setitem__(index, value)
def free_index(self):
return next((i for i, x in enumerate(self) if x is None), len(self))
class IndexList(list):
def __delitem__(self, index):
@ -19,36 +29,51 @@ class IndexList(list): @@ -19,36 +29,51 @@ class IndexList(list):
class Node:
"""A Node is a named entity in a circuit (e.g. a gate, a standard cell,
a named signal, or a fan-out point) that has connections to other nodes.
Each node contains:
* `self.index`: a circuit-unique integer index.
* `self.kind`: a type describing its function (e.g. 'AND', 'NOR').
The type '__fork__' is special. It signifies a named signal
or a fan-out in the circuit. Any other type is considered a physical cell.
* ``: a name. Names must be unique among all forks and all cells
in the circuit. However, a fork (`self.kind=='__fork__'`) and a cell with
the same name may coexist.
* `self.ins`: a list of input connections (objects of class `Line`)
* `self.outs`: a list of output connections (objects of class `Line`).
"""A node is a named entity in a circuit (e.g. a gate, a standard cell,
a named signal, or a fan-out point) that is connected to other nodes via lines.
The constructor automatically adds the new node to the given circuit.
def __init__(self, circuit, name, kind='__fork__'):
if kind == '__fork__':
if name in circuit.forks:
raise ValueError(f'fork of name {name} already exists.')
assert name not in circuit.forks, f'fork of name {name} already in circuit.'
circuit.forks[name] = self
if name in circuit.cells:
raise ValueError(f'cell of name {name} already exists.')
assert name not in circuit.cells, f'cell of name {name} already in circuit.'
circuit.cells[name] = self
self.index = len(circuit.nodes)
self.circuit = circuit
"""The :class:`Circuit` object the node is part of.
""" = name
"""The name of the node.
Names must be unique among all forks and all cells in the circuit.
However, a fork (:py:attr:`kind` is set to '__fork__') and a cell with the same name may coexist.
self.kind = kind
"""A string describing the type of the node.
Common types are the names from a standard cell library or general gate names like 'AND' or 'NOR'.
If :py:attr:`kind` is set to '__fork__', it receives special treatment.
A `fork` describes a named signal or a fan-out point in the circuit and not a physical `cell` like a gate.
In the circuit, the namespaces of forks and cells are kept separate.
While :py:attr:`name` must be unique among all forks and all cells, a fork can have the same name as a cell.
The :py:attr:`index`, however, is unique among all nodes; a fork cannot have the same index as a cell.
self.index = len(circuit.nodes) - 1
"""A unique and consecutive integer index of the node within the circuit.
It can be used to store additional data about the node :code:`n`
by allocating an array or list :code:`my_data` of length :code:`len(n.circuit.nodes)` and
accessing it by :code:`my_data[n.index]`.
self.ins = GrowingList()
"""A list of input connections (:class:`Line` objects).
self.outs = GrowingList()
"""A list of output connections (:class:`Line` objects).
def __repr__(self):
ins = ' '.join([f'<{line.index}' if line is not None else '<None' for line in self.ins])
@ -56,6 +81,13 @@ class Node: @@ -56,6 +81,13 @@ class Node:
return f'{self.index}:{self.kind}"{}" {ins} {outs}'
def remove(self):
"""Removes the node from its circuit.
Lines may still reference the removed node.
The user must connect such lines to other nodes or remove the lines from the circuit.
To keep the indices consecutive, the node with the highest index within the circuit
will be assigned the index of the removed node.
if self.circuit is not None:
del self.circuit.nodes[self.index]
if self.kind == '__fork__':
@ -66,56 +98,67 @@ class Node: @@ -66,56 +98,67 @@ class Node:
class Line:
"""A Line is a directional 1:1 connection between two Nodes. It always
connects an output of a node (called `driver`) to an input of a node
(called `reader`) and has a circuit-unique index (`self.index`).
"""A line is a directional 1:1 connection between two nodes.
Furthermore, `self.driver_pin` and `self.reader_pin` are the
integer indices of the connected pins of the nodes. They always correspond
to the positions of the line in the connection lists of the nodes:
It always connects an output of one `driver` node to an input of one `reader` node.
If a signal fans out to multiple readers, a '__fork__' node needs to be added.
* `self.driver.outs[self.driver_pin] == self`
* `self.reader.ins[self.reader_pin] == self`
The constructor automatically adds the new line to the given circuit and inserts references into the connection
lists of connected nodes.
A Line always connects a single driver to a single reader. If a signal fans out to
multiple readers, a '__fork__' Node needs to be added.
When adding a line, input and output pins can either be specified explicitly
:code:`Line(circuit, (driver, 2), (reader, 0))`, or implicitly :code:`Line(circuit, driver, reader)`.
In the implicit case, the line will be connected to the first free pin of the node.
Use the explicit case only if connections to specific pins are required.
It may overwrite any previous line references in the connection list of the nodes.
def __init__(self, circuit, driver, reader):
self.index = len(circuit.lines)
if type(driver) is Node:
self.driver = driver
self.driver_pin = len(driver.outs)
for pin, line in enumerate(driver.outs):
if line is None:
self.driver_pin = pin
self.driver, self.driver_pin = driver
if type(reader) is Node:
self.reader = reader
self.reader_pin = len(reader.ins)
for pin, line in enumerate(reader.ins):
if line is None:
self.reader_pin = pin
self.reader, self.reader_pin = reader
self.circuit = circuit
"""The :class:`Circuit` object the line is part of.
self.index = len(self.circuit.lines) - 1
"""A unique and consecutive integer index of the line within the circuit.
It can be used to store additional data about the line :code:`l`
by allocating an array or list :code:`my_data` of length :code:`len(l.circuit.lines)` and
accessing it by :code:`my_data[l.index]`.
if not isinstance(driver, tuple): driver = (driver, driver.outs.free_index())
self.driver = driver[0]
"""The :class:`Node` object that drives this line.
self.driver_pin = driver[1]
"""The output pin position of the driver node this line is connected to.
This is the position in the outs-list of the driving node this line referenced from:
:code:`self.driver.outs[self.driver_pin] == self`.
if not isinstance(reader, tuple): reader = (reader, reader.ins.free_index())
self.reader = reader[0]
"""The :class:`Node` object that reads this line.
self.reader_pin = reader[1]
"""The input pin position of the reader node this line is connected to.
This is the position in the ins-list of the reader node this line referenced from:
:code:`self.reader.ins[self.reader_pin] == self`.
self.driver.outs[self.driver_pin] = self
self.reader.ins[self.reader_pin] = self
def remove(self):
circuit = None
if self.driver is not None:
self.driver.outs[self.driver_pin] = None
circuit = self.driver.circuit
if self.reader is not None:
self.reader.ins[self.reader_pin] = None
circuit = self.reader.circuit
if circuit is not None:
del circuit.lines[self.index]
"""Removes the line from its circuit and its referencing nodes.
To keep the indices consecutive, the line with the highest index within the circuit
will be assigned the index of the removed line.
if self.driver is not None: self.driver.outs[self.driver_pin] = None
if self.reader is not None: self.reader.ins[self.reader_pin] = None
if self.circuit is not None: del self.circuit.lines[self.index]
self.driver = None
self.reader = None
self.circuit = None
def __repr__(self):
return f'{self.index}'
@ -127,27 +170,53 @@ class Line: @@ -127,27 +170,53 @@ class Line:
class Circuit:
"""A Circuit is a container for interconnected nodes and lines.
All contained lines have unique indices, so have all contained nodes.
These indices can be used to store additional data about nodes or lines
by allocating an array `my_data` of length `len(self.nodes)` and then
accessing it by `my_data[n.index]`. The indices may change iff lines or
nodes are removed from the circuit.
It provides access to lines by index and to nodes by index and by name.
Nodes come in two flavors: `cells` and `forks` (see :py:attr:`Node.kind`).
The name spaces of cells and forks are kept separate.
The indices of nodes and lines are kept consecutive and unique.
Whenever lines or nodes are removed from the circuit, the indices of some other lines or nodes may change
to enforce consecutiveness.
Nodes come in two flavors (cells and forks, see `Node`). The names of
these nodes are kept unique within these two flavors.
A subset of nodes can be designated as primary input- or output-ports of the circuit.
This is done by adding them to the :py:attr:`interface` list.
def __init__(self, name=None): = name
"""The name of the circuit.
self.nodes = IndexList()
"""A list of all :class:`Node` objects contained in the circuit.
The position of a node in this list equals its index :code:`self.nodes[42].index == 42`.
self.lines = IndexList()
"""A list of all :class:`Line` objects contained in the circuit.
The position of a line in this list equals its index :code:`self.lines[42].index == 42`.
self.interface = GrowingList()
"""A list of nodes that are designated as primary input- or output-ports.
Port-nodes are contained in :py:attr:`nodes` as well as :py:attr:`interface`.
The position of a node in the interface list corresponds to positions of logic values in test vectors.
The port direction is not stored explicitly.
Usually, nodes in the interface list without any lines in their :py:attr:`Node.ins` list are primary inputs,
and nodes without any lines in their :py:attr:`Node.outs` list are regarded as primary outputs.
self.cells = {}
"""A dictionary to access cells by name.
self.forks = {}
"""A dictionary to access forks by name.
def get_or_add_fork(self, name):
return self.forks[name] if name in self.forks else Node(self, name)
def copy(self):
"""Returns a deep copy of the circuit.
c = Circuit(
for node in self.nodes:
Node(c,, node.kind)
@ -164,6 +233,8 @@ class Circuit: @@ -164,6 +233,8 @@ class Circuit:
return c
def dump(self):
"""Returns a string representation of the circuit and all its nodes.
header = f'{}({",".join([str(n.index) for n in self.interface])})\n'
return header + '\n'.join([str(n) for n in self.nodes])
@ -172,6 +243,11 @@ class Circuit: @@ -172,6 +243,11 @@ class Circuit:
return f'<Circuit{name} with {len(self.nodes)} nodes, {len(self.lines)} lines, {len(self.interface)} ports>'
def topological_order(self):
"""Generator function to iterate over all nodes in topological order.
Nodes without input lines and nodes whose :py:attr:`Node.kind` contains the substring 'DFF' are
yielded first.
visit_count = [0] * len(self.nodes)
queue = deque(n for n in self.nodes if len(n.ins) == 0 or 'DFF' in n.kind)
while len(queue) > 0:
@ -185,12 +261,19 @@ class Circuit: @@ -185,12 +261,19 @@ class Circuit:
yield n
def topological_line_order(self):
"""Generator function to iterate over all lines in topological order.
for n in self.topological_order():
for line in n.outs:
if line is not None:
yield line
def reversed_topological_order(self):
"""Generator function to iterate over all nodes in reversed topological order.
Nodes without output lines and nodes whose :py:attr:`Node.kind` contains the substring 'DFF' are
yielded first.
visit_count = [0] * len(self.nodes)
queue = deque(n for n in self.nodes if len(n.outs) == 0 or 'DFF' in n.kind)
while len(queue) > 0:
@ -203,6 +286,10 @@ class Circuit: @@ -203,6 +286,10 @@ class Circuit:
yield n
def fanin(self, origin_nodes):
"""Generator function to iterate over the fan-in cone of a given list of origin nodes.
Nodes are yielded in reversed topological order.
marks = [False] * len(self.nodes)
for n in origin_nodes:
marks[n.index] = True


@ -0,0 +1,402 @@ @@ -0,0 +1,402 @@
"""This module contains definitions and data structures for 2-, 4-, and 8-valued logic operations.
8 logic values are defined as integer constants.
* For 2-valued logic: ``ZERO`` and ``ONE``
* 4-valued logic adds: ``UNASSIGNED`` and ``UNKNOWN``
* 8-valued logic adds: ``RISE``, ``FALL``, ``PPULSE``, and ``NPULSE``.
The bits in these constants have the following meaning:
* bit 0: Final/settled binary value of a signal
* bit 1: Initial binary value of a signal
* bit 2: Activity or transitions are present on a signal
Special meaning is given to values where bits 0 and 1 differ, but bit 2 (activity) is 0.
These values are interpreted as ``UNKNOWN`` or ``UNASSIGNED`` in 4-valued and 8-valued logic.
In general, 2-valued logic only considers bit 0, 4-valued logic considers bits 0 and 1, and 8-valued logic
considers all 3 bits.
The only exception is constant ``ONE=0b11`` which has two bits set for all logics including 2-valued logic.
import math
from import Iterable
import numpy as np
from . import numba
ZERO = 0b000
"""Integer constant ``0b000`` for logic-0. ``'0'``, ``0``, ``False``, ``'L'``, and ``'l'`` are interpreted as ``ZERO``.
UNKNOWN = 0b001
"""Integer constant ``0b001`` for unknown or conflict. ``'X'``, or any other value is interpreted as ``UNKNOWN``.
"""Integer constant ``0b010`` for unassigned or high-impedance. ``'-'``, ``None``, ``'Z'``, and ``'z'`` are
interpreted as ``UNASSIGNED``.
ONE = 0b011
"""Integer constant ``0b011`` for logic-1. ``'1'``, ``1``, ``True``, ``'H'``, and ``'h'`` are interpreted as ``ONE``.
PPULSE = 0b100
"""Integer constant ``0b100`` for positive pulse, meaning initial and final values are 0, but there is some activity
on a signal. ``'P'``, ``'p'``, and ``'^'`` are interpreted as ``PPULSE``.
RISE = 0b101
"""Integer constant ``0b110`` for a rising transition. ``'R'``, ``'r'``, and ``'/'`` are interpreted as ``RISE``.
FALL = 0b110
"""Integer constant ``0b101`` for a falling transition. ``'F'``, ``'f'``, and ``'\\'`` are interpreted as ``FALL``.
NPULSE = 0b111
"""Integer constant ``0b111`` for negative pulse, meaning initial and final values are 1, but there is some activity
on a signal. ``'N'``, ``'n'``, and ``'v'`` are interpreted as ``NPULSE``.
def interpret(value):
if isinstance(value, Iterable) and not (isinstance(value, str) and len(value) == 1):
return list(map(interpret, value))
if value in [0, '0', False, 'L', 'l']:
return ZERO
if value in [1, '1', True, 'H', 'h']:
return ONE
if value in [None, '-', 'Z', 'z']:
if value in ['R', 'r', '/']:
return RISE
if value in ['F', 'f', '\\']:
return FALL
if value in ['P', 'p', '^']:
return PPULSE
if value in ['N', 'n', 'v']:
return NPULSE
return UNKNOWN
_bit_in_lut = np.array([2 ** x for x in range(7, -1, -1)], dtype='uint8')
def bit_in(a, pos):
return a[pos >> 3] & _bit_in_lut[pos & 7]
def mv_cast(*args, m=8):
return [a if isinstance(a, MVArray) else MVArray(a, m=m) for a in args]
def mv_getm(*args):
return max([a.m for a in args if isinstance(a, MVArray)] + [0]) or 8
def _mv_not(m, out, inp):
np.bitwise_xor(inp, 0b11, out=out) # this also exchanges UNASSIGNED <-> UNKNOWN
if m > 2:
np.putmask(out, (inp == UNKNOWN), UNKNOWN) # restore UNKNOWN
def mv_not(x1, out=None):
m = mv_getm(x1)
x1 = mv_cast(x1, m=m)[0]
out = out or MVArray(, m=m)
return out
def _mv_or(m, out, *ins):
if m > 2:
any_unknown = (ins[0] == UNKNOWN) | (ins[0] == UNASSIGNED)
for inp in ins[1:]: any_unknown |= (inp == UNKNOWN) | (inp == UNASSIGNED)
any_one = (ins[0] == ONE)
for inp in ins[1:]: any_one |= (inp == ONE)
out[...] = ZERO
np.putmask(out, any_one, ONE)
for inp in ins:
np.bitwise_or(out, inp, out=out, where=~any_one)
np.putmask(out, (any_unknown & ~any_one), UNKNOWN)
out[...] = ZERO
for inp in ins: np.bitwise_or(out, inp, out=out)
def mv_or(x1, x2, out=None):
m = mv_getm(x1, x2)
x1, x2 = mv_cast(x1, x2, m=m)
out = out or MVArray(np.broadcast(,, m=m)
return out
def _mv_and(m, out, *ins):
if m > 2:
any_unknown = (ins[0] == UNKNOWN) | (ins[0] == UNASSIGNED)
for inp in ins[1:]: any_unknown |= (inp == UNKNOWN) | (inp == UNASSIGNED)
any_zero = (ins[0] == ZERO)
for inp in ins[1:]: any_zero |= (inp == ZERO)
out[...] = ONE
np.putmask(out, any_zero, ZERO)
for inp in ins:
np.bitwise_and(out, inp | 0b100, out=out, where=~any_zero)
if m > 4: np.bitwise_or(out, inp & 0b100, out=out, where=~any_zero)
np.putmask(out, (any_unknown & ~any_zero), UNKNOWN)
out[...] = ONE
for inp in ins: np.bitwise_and(out, inp, out=out)
def mv_and(x1, x2, out=None):
m = mv_getm(x1, x2)
x1, x2 = mv_cast(x1, x2, m=m)
out = out or MVArray(np.broadcast(,, m=m)
return out
def _mv_xor(m, out, *ins):
if m > 2:
any_unknown = (ins[0] == UNKNOWN) | (ins[0] == UNASSIGNED)
for inp in ins[1:]: any_unknown |= (inp == UNKNOWN) | (inp == UNASSIGNED)
out[...] = ZERO
for inp in ins:
np.bitwise_xor(out, inp & 0b011, out=out)
if m > 4: np.bitwise_or(out, inp & 0b100, out=out)
np.putmask(out, any_unknown, UNKNOWN)
out[...] = ZERO
for inp in ins: np.bitwise_xor(out, inp, out=out)
def mv_xor(x1, x2, out=None):
m = mv_getm(x1, x2)
x1, x2 = mv_cast(x1, x2, m=m)
out = out or MVArray(np.broadcast(,, m=m)
return out
def mv_transition(init, final, out=None):
m = mv_getm(init, final)
init, final = mv_cast(init, final, m=m)
init =
final =
out = out or MVArray(np.broadcast(init, final).shape, m=8)[...] = (init & 0b010) | (final & 0b001)[...] |= (( << 1) ^ ( << 2)) & 0b100
unknown = (init == UNKNOWN) | (init == UNASSIGNED) | (final == UNKNOWN) | (final == UNASSIGNED)
unassigned = (init == UNASSIGNED) & (final == UNASSIGNED)
np.putmask(, unknown, UNKNOWN)
np.putmask(, unassigned, UNASSIGNED)
return out
class MVArray:
"""An n-dimensional array of m-valued logic values.
This class wraps a numpy.ndarray of type uint8 and adds support for encoding and
interpreting 2-valued, 4-valued, and 8-valued logic values.
Each logic value is stored as an uint8, value manipulations are cheaper than in BPArray.
An MVArray always has 2 axes:
* Axis 0 is PI/PO/FF position, the length of this axis is called "width".
* Axis 1 is vector/pattern, the length of this axis is called "length".
def __init__(self, a, m=None):
self.m = m or 8
assert self.m in [2, 4, 8]
# Try our best to interpret given a.
if isinstance(a, MVArray): =
self.m = m or a.m
elif hasattr(a, 'data'): # assume it is a BPArray. Can't use isinstance() because BPArray isn't declared yet. = np.zeros((a.width, a.length), dtype=np.uint8)
self.m = m or a.m
for i in range([-2]):[...] <<= 1[...] |= np.unpackbits([..., -i-1, :], axis=1)[:, :a.length]
if[-2] == 1: *= 3
elif isinstance(a, int): = np.full((a, 1), UNASSIGNED, dtype=np.uint8)
elif isinstance(a, tuple): = np.full(a, UNASSIGNED, dtype=np.uint8)
if isinstance(a, str): a = [a] = np.asarray(interpret(a), dtype=np.uint8) =[:, np.newaxis] if == 1 else np.moveaxis(, -2, -1)
# Cast data to m-valued logic.
if self.m == 2:[...] = (( & 0b001) & (( >> 1) & 0b001) | ( == RISE)) * ONE
elif self.m == 4:[...] = ( & 0b011) & (( != FALL) * ONE) | (( == RISE) * ONE)
elif self.m == 8:[...] = & 0b111
self.length =[-1]
self.width =[-2]
def __repr__(self):
return f'<MVArray length={self.length} width={self.width} m={self.m} nbytes={}>'
def __str__(self):
return str([self[idx] for idx in range(self.length)])
def __getitem__(self, vector_idx):
chars = ["0", "X", "-", "1", "P", "R", "F", "N"]
return ''.join(chars[v] for v in[:, vector_idx])
def __len__(self):
return self.length
def bp_buf(out, inp):
md = out.shape[-2]
assert md == inp.shape[-2]
if md > 1:
unknown = inp[..., 0, :] ^ inp[..., 1, :]
if md > 2: unknown &= ~inp[..., 2, :]
out[..., 0, :] = inp[..., 0, :] | unknown
out[..., 1, :] = inp[..., 1, :] & ~unknown
if md > 2: out[..., 2, :] = inp[..., 2, :] & ~unknown
out[..., 0, :] = inp[..., 0, :]
def bp_not(out, inp):
md = out.shape[-2]
assert md == inp.shape[-2]
if md > 1:
unknown = inp[..., 0, :] ^ inp[..., 1, :]
if md > 2: unknown &= ~inp[..., 2, :]
out[..., 0, :] = ~inp[..., 0, :] | unknown
out[..., 1, :] = ~inp[..., 1, :] & ~unknown
if md > 2: out[..., 2, :] = inp[..., 2, :] & ~unknown
out[..., 0, :] = ~inp[..., 0, :]
def bp_or(out, *ins):
md = out.shape[-2]
for inp in ins: assert md == inp.shape[-2]
out[...] = 0
if md == 1:
for inp in ins: out[..., 0, :] |= inp[..., 0, :]
elif md == 2:
any_unknown = ins[0][..., 0, :] ^ ins[0][..., 1, :]
for inp in ins[1:]: any_unknown |= inp[..., 0, :] ^ inp[..., 1, :]
any_one = ins[0][..., 0, :] & ins[0][..., 1, :]
for inp in ins[1:]: any_one |= inp[..., 0, :] & inp[..., 1, :]
for inp in ins:
out[..., 0, :] |= inp[..., 0, :] | any_unknown
out[..., 1, :] |= inp[..., 1, :] & (~any_unknown | any_one)
any_unknown = (ins[0][..., 0, :] ^ ins[0][..., 1, :]) & ~ins[0][..., 2, :]
for inp in ins[1:]: any_unknown |= (inp[..., 0, :] ^ inp[..., 1, :]) & ~inp[..., 2, :]
any_one = ins[0][..., 0, :] & ins[0][..., 1, :] & ~ins[0][..., 2, :]
for inp in ins[1:]: any_one |= inp[..., 0, :] & inp[..., 1, :] & ~inp[..., 2, :]
for inp in ins:
out[..., 0, :] |= inp[..., 0, :] | any_unknown
out[..., 1, :] |= inp[..., 1, :] & (~any_unknown | any_one)
out[..., 2, :] |= inp[..., 2, :] & (~any_unknown | any_one) & ~any_one
def bp_and(out, *ins):
md = out.shape[-2]
for inp in ins: assert md == inp.shape[-2]
out[...] = 0xff
if md == 1:
for inp in ins: out[..., 0, :] &= inp[..., 0, :]
elif md == 2:
any_unknown = ins[0][..., 0, :] ^ ins[0][..., 1, :]
for inp in ins[1:]: any_unknown |= inp[..., 0, :] ^ inp[..., 1, :]
any_zero = ~ins[0][..., 0, :] & ~ins[0][..., 1, :]
for inp in ins[1:]: any_zero |= ~inp[..., 0, :] & ~inp[..., 1, :]
for inp in ins:
out[..., 0, :] &= inp[..., 0, :] | (any_unknown & ~any_zero)
out[..., 1, :] &= inp[..., 1, :] & ~any_unknown
any_unknown = (ins[0][..., 0, :] ^ ins[0][..., 1, :]) & ~ins[0][..., 2, :]
for inp in ins[1:]: any_unknown |= (inp[..., 0, :] ^ inp[..., 1, :]) & ~inp[..., 2, :]
any_zero = ~ins[0][..., 0, :] & ~ins[0][..., 1, :] & ~ins[0][..., 2, :]
for inp in ins[1:]: any_zero |= ~inp[..., 0, :] & ~inp[..., 1, :] & ~inp[..., 2, :]
out[..., 2, :] = 0
for inp in ins:
out[..., 0, :] &= inp[..., 0, :] | (any_unknown & ~any_zero)
out[..., 1, :] &= inp[..., 1, :] & ~any_unknown
out[..., 2, :] |= inp[..., 2, :] & (~any_unknown | any_zero) & ~any_zero
def bp_xor(out, *ins):
md = out.shape[-2]
for inp in ins: assert md == inp.shape[-2]
out[...] = 0
if md == 1:
for inp in ins: out[..., 0, :] ^= inp[..., 0, :]
elif md == 2:
any_unknown = ins[0][..., 0, :] ^ ins[0][..., 1, :]
for inp in ins[1:]: any_unknown |= inp[..., 0, :] ^ inp[..., 1, :]
for inp in ins: out[...] ^= inp
out[..., 0, :] |= any_unknown
out[..., 1, :] &= ~any_unknown
any_unknown = (ins[0][..., 0, :] ^ ins[0][..., 1, :]) & ~ins[0][..., 2, :]
for inp in ins[1:]: any_unknown |= (inp[..., 0, :] ^ inp[..., 1, :]) & ~inp[..., 2, :]
for inp in ins:
out[..., 0, :] ^= inp[..., 0, :]
out[..., 1, :] ^= inp[..., 1, :]
out[..., 2, :] |= inp[..., 2, :]
out[..., 0, :] |= any_unknown
out[..., 1, :] &= ~any_unknown
out[..., 2, :] &= ~any_unknown
class BPArray:
"""An n-dimensional array of m-valued logic values that uses bit-parallel storage.
The primary use of this format is in aiding efficient bit-parallel logic simulation.
The secondary benefit over MVArray is its memory efficiency.
Accessing individual values is more expensive than with :py:class:`MVArray`.
It is advised to first construct a MVArray, pack it into a :py:class:`BPArray` for simulation and unpack the results
back into a :py:class:`MVArray` for value access.
The values along the last axis (vectors/patterns) are packed into uint8 words.
The second-last axis has length ceil(log2(m)) for storing all bits.
All other axes stay the same as in MVArray.
def __init__(self, a, m=None):
if not isinstance(a, MVArray) and not isinstance(a, BPArray):
a = MVArray(a, m)
self.m = a.m
if isinstance(a, MVArray):
if m is not None and m != a.m:
a = MVArray(a, m) # cast data
self.m = a.m
assert self.m in [2, 4, 8]
nwords = math.ceil(math.log2(self.m))
nbytes = ([-1] - 1) // 8 + 1 = np.zeros([:-1] + (nwords, nbytes), dtype=np.uint8)
for i in range([-2]):[..., i, :] = np.packbits(( >> i) & 1, axis=-1)
else: # we have a BPArray = # TODO: support conversion to different m
self.m = a.m
self.length = a.length
self.width = a.width
def __repr__(self):
return f'<BPArray length={self.length} width={self.width} m={self.m} bytes={}>'
def __len__(self):
return self.length


@ -1,21 +1,25 @@ @@ -1,21 +1,25 @@
import math
import numpy as np
from . import packed_vectors
from . import logic
class LogicSim:
"""A bit-parallel naive combinational logic simulator supporting 1, 4, or 8-valued logics.
"""A bit-parallel naïve combinational simulator for 2-, 4-, or 8-valued logic.
def __init__(self, circuit, nvectors=1, vdim=1):
def __init__(self, circuit, sims=1, m=8):
assert m in [2, 4, 8]
self.m = m
mdim = math.ceil(math.log2(m))
self.circuit = circuit
self.nvectors = nvectors
nbytes = (nvectors - 1) // 8 + 1
self.sims = sims
nbytes = (sims - 1) // 8 + 1
self.interface = list(circuit.interface) + [n for n in circuit.nodes if 'dff' in n.kind.lower()]
self.state = np.zeros((len(circuit.lines), vdim, nbytes), dtype='uint8')
self.state = np.zeros((len(circuit.lines), mdim, nbytes), dtype='uint8')
self.state_epoch = np.zeros(len(circuit.nodes), dtype='int8') - 1
self.tmp = np.zeros((5, vdim, nbytes), dtype='uint8') = np.zeros((vdim, nbytes), dtype='uint8')
if vdim > 1:[1] = 255
self.tmp = np.zeros((5, mdim, nbytes), dtype='uint8') = np.zeros((mdim, nbytes), dtype='uint8')
self.epoch = 0
self.fork_vd1 = self.fork_vdx
@ -45,23 +49,23 @@ class LogicSim: @@ -45,23 +49,23 @@ class LogicSim:
self.nbuff_vd3 = self.fork_vd3
self.xor2_vd3 = self.xor_vd3
known_fct = [(f[:-4], getattr(self, f)) for f in dir(self) if f.endswith(f'_vd{vdim}')]
known_fct = [(f[:-4], getattr(self, f)) for f in dir(self) if f.endswith(f'_vd{mdim}')]
self.node_fct = []
for n in circuit.nodes:
t = n.kind.lower().replace('__fork__', 'fork')
t = t.replace('__const0__', 'const0')
t = t.replace('__const1__', 'const1')
t = t.replace('tieh', 'const1')
# t = t.replace('xor', 'or').replace('xnor', 'nor')
fcts = [f for n, f in known_fct if t.startswith(n)]
if len(fcts) < 1:
raise ValueError(f'Unknown node kind {n.kind}')
def assign(self, stimuli):
if isinstance(stimuli, packed_vectors.PackedVectors):
stimuli = stimuli.bits
for (stim, node) in zip(stimuli, self.interface):
"""Assign stimuli to the primary inputs and state-elements (flip-flops)."""
if hasattr(stimuli, 'data'):
stimuli =
for stim, node in zip(stimuli, self.interface):
if len(node.outs) == 0: continue
outputs = [self.state[line.index] if line else self.tmp[3] for line in node.outs]
self.node_fct[node.index]([stim], outputs)
@ -78,13 +82,16 @@ class LogicSim: @@ -78,13 +82,16 @@ class LogicSim:
self.state_epoch[line.reader.index] = self.epoch
def capture(self, responses):
if isinstance(responses, packed_vectors.PackedVectors):
responses = responses.bits
for (resp, node) in zip(responses, self.interface):
"""Capture the current values at the primary outputs and in the state-elements (flip-flops)."""
if hasattr(responses, 'data'):
responses =
for resp, node in zip(responses, self.interface):
if len(node.ins) == 0: continue
resp[...] = self.state[node.ins[0].index]
# print(responses)
def propagate(self):
"""Propagate the input values towards the outputs (Perform all logic operations in topological order)."""
for node in self.circuit.topological_order():
if self.state_epoch[node.index] != self.epoch: continue
inputs = [self.state[line.index] if line else for line in node.ins]
@ -95,8 +102,7 @@ class LogicSim: @@ -95,8 +102,7 @@ class LogicSim:
self.state_epoch[line.reader.index] = self.epoch
self.epoch = (self.epoch + 1) % 128
def fork_vdx(inputs, outputs):
def fork_vdx(self, inputs, outputs):
for o in outputs: o[...] = inputs[0]
def const0_vdx(self, _, outputs):
@ -104,40 +110,34 @@ class LogicSim: @@ -104,40 +110,34 @@ class LogicSim:
# 2-valued simulation
def not_vd1(inputs, outputs):
def not_vd1(self, inputs, outputs):
outputs[0][0] = ~inputs[0][0]
def const1_vd1(self, _, outputs):
for o in outputs: o[...] =
self.not_vd1(outputs, outputs)
def and_vd1(inputs, outputs):
def and_vd1(self, inputs, outputs):
o = outputs[0]
o[0] = inputs[0][0]
for i in inputs[1:]: o[0] &= i[0]
def or_vd1(inputs, outputs):
def or_vd1(self, inputs, outputs):
o = outputs[0]
o[0] = inputs[0][0]
for i in inputs[1:]: o[0] |= i[0]
def xor_vd1(inputs, outputs):
def xor_vd1(self, inputs, outputs):
o = outputs[0]
o[0] = inputs[0][0]
for i in inputs[1:]: o[0] ^= i[0]
def sdff_vd1(inputs, outputs):
def sdff_vd1(self, inputs, outputs):
outputs[0][0] = inputs[0][0]
if len(outputs) > 1:
outputs[1][0] = ~inputs[0][0]
def dff_vd1(inputs, outputs):
def dff_vd1(self, inputs, outputs):
outputs[0][0] = inputs[0][0]
if len(outputs) > 1:
outputs[1][0] = ~inputs[0][0]
@ -155,93 +155,26 @@ class LogicSim: @@ -155,93 +155,26 @@ class LogicSim:
self.not_vd1(outputs, outputs)
# 4-valued simulation
# sym [0] [1] (value, care)
# 0 0 1
# 1 1 1
# - 0 0
# X 1 0
def not_vd2(inputs, outputs):
# 4-valued not:
# i: 0 1 - X
# o: 1 0 X X
# o0 1 0 1 1
# o1 1 1 0 0
outputs[0][0] = ~inputs[0][0] | ~inputs[0][1] # value = 0 or DC
outputs[0][1] = inputs[0][1] # care = C
def not_vd2(self, inputs, outputs):
logic.bp_not(outputs[0], inputs[0])
def and_vd2(self, inputs, outputs):
# 4-valued: o[0]: o[1]:
# 0 1 - X 0 1 - X 0 1 - X
# 0 0 0 0 0 0 0 0 0 1 1 1 1
# 1 0 1 X X 0 1 1 1 1 1 0 0
# - 0 X X X 0 1 1 1 1 0 0 0
# X 0 X X X 0 1 1 1 1 0 0 0
i = inputs[0]
any0 = self.tmp[0]
anyd = self.tmp[1]
any0[0] = ~i[0] & i[1]
anyd[0] = ~i[1]
for i in inputs[1:]:
any0[0] |= ~i[0] & i[1]
anyd[0] |= ~i[1]
o = outputs[0]
o[0] = ~any0[0] # value = no0
o[1] = any0[0] | ~anyd[0] # care = any0 or noDC
logic.bp_and(outputs[0], *inputs)
def or_vd2(self, inputs, outputs):
# 4-valued: o[0]: o[1]:
# 0 1 - X 0 1 - X 0 1 - X
# 0 0 1 X X 0 1 1 1 1 1 0 0
# 1 1 1 1 1 1 1 1 1 1 1 1 1
# - X 1 X X 1 1 1 1 0 1 0 0
# X X 1 X X 1 1 1 1 0 1 0 0
i = inputs[0]
any1 = self.tmp[0]
anyd = self.tmp[1]
any1[0] = i[0] & i[1]
anyd[0] = ~i[1]
for i in inputs[1:]:
any1[0] |= i[0] & i[1]
anyd[0] |= ~i[1]
o = outputs[0]
o[0] = any1[0] | anyd[0] # value = any1 or anyDC
o[1] = any1[0] | ~anyd[0] # care = any1 or noDC
logic.bp_or(outputs[0], *inputs)
def xor_vd2(self, inputs, outputs):
# 4-valued: o[0]: o[1]:
# 0 1 - X 0 1 - X 0 1 - X
# 0 0 1 X X 0 1 1 1 1 1 0 0
# 1 1 0 X X 1 0 1 1 1 1 0 0
# - X X X X 1 1 1 1 0 0 0 0
# X X X X X 1 1 1 1 0 0 0 0
i = inputs[0]
odd1 = self.tmp[0]
anyd = self.tmp[1]
odd1[0] = i[0] & i[1]
anyd[0] = ~i[1]
for i in inputs[1:]:
odd1[0] ^= i[0] & i[1]
anyd[0] |= ~i[1]
o = outputs[0]
o[0] = odd1[0] | anyd[0] # value = odd1 or anyDC
o[1] = ~anyd[0] # care = noDC
logic.bp_xor(outputs[0], *inputs)
def sdff_vd2(self, inputs, outputs):
self.dff_vd2(inputs, outputs)
if len(outputs) > 1:
outputs[1][0] = ~inputs[0][0] | ~inputs[0][1] # value = 0 or DC
outputs[1][1] = inputs[0][1] # care = C
logic.bp_not(outputs[1], inputs[0])
def dff_vd2(inputs, outputs):
outputs[0][0] = inputs[0][0] | ~inputs[0][1] # value = 1 or DC
outputs[0][1] = inputs[0][1] # care = C
def dff_vd2(self, inputs, outputs):
logic.bp_buf(outputs[0], inputs[0])
def nand_vd2(self, inputs, outputs):
self.and_vd2(inputs, outputs)
@ -260,149 +193,26 @@ class LogicSim: @@ -260,149 +193,26 @@ class LogicSim:
self.not_vd2(outputs, outputs)
# 8-valued simulation
# sym [0] [1] [2] (initial value, ~final value, toggles present?)
# 0 0 1 0
# 1 1 0 0
# - 0 0 0
# X 1 1 0
# R 0 0 1 _/"
# F 1 1 1 "\_
# P 0 1 1 _/\_
# N 1 0 1 "\/"
def not_vd3(self, inputs, outputs):
# 8-valued not:
# i: 0 1 - X R F P N
# i0 0 1 0 1 0 1 0 1
# i1 1 0 0 1 0 1 1 0
# i2 0 0 0 0 1 1 1 1
# o: 1 0 X X F R N P
# o0 1 0 1 1 1 0 1 0
# o1 0 1 1 1 1 0 0 1
# o2 0 0 0 0 1 1 1 1
i = inputs[0]
dc = self.tmp[0]
dc[0] = ~(i[0] ^ i[1]) & ~i[2]
dc = self.tmp[0]
outputs[0][0] = ~i[0] | dc[0] # init.v = ~i0 or DC
outputs[0][1] = ~i[1] | dc[0] # init.v = ~i1 or DC
outputs[0][2] = i[2] # toggles = i2
logic.bp_not(outputs[0], inputs[0])
def and_vd3(self, inputs, outputs):
# 8-valued: o[0]: o[1]: o[2]:
# 0 1 - X R F P N 0 1 - X R F P N 0 1 - X R F P N 0 1 - X R F P N
# 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 1 1 1 1 1 1 1 0 0 0 0 0 0 0 0
# 1 0 1 X X R F P N 0 1 1 1 0 1 0 1 1 0 1 1 0 1 1 0 0 0 0 0 1 1 1 1
# - 0 X X X X X X X 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 0 0 0 0 0 0 0 0
# X 0 X X X X X X X 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 0 0 0 0 0 0 0 0
# R 0 R X X R R P R 0 0 1 1 0 0 0 0 1 0 1 1 0 0 1 0 0 1 0 0 1 1 1 1
# F 0 F X X R F P F 0 1 1 1 0 1 0 1 1 1 1 1 0 1 1 1 0 1 0 0 1 1 1 1
# P 0 P X X P P P P 0 0 1 1 0 0 0 0 1 1 1 1 1 1 1 1 0 1 0 0 1 1 1 1
# N 0 N X X R F P N 0 1 1 1 0 1 0 1 1 0 1 1 0 1 1 0 0 1 0 0 1 1 1 1
i = inputs[0]
anyi0 = self.tmp[0]
anyf0 = self.tmp[1]
anyd = self.tmp[2]
any0 = self.tmp[3]
any_t = self.tmp[4]
anyd[0] = ~(i[0] ^ i[1]) & ~i[2]
anyi0[0] = ~i[0] & ~anyd[0]
anyf0[0] = i[1] & ~anyd[0]
any_t[0] = i[2]
any0[0] = anyi0[0] & anyf0[0] & ~i[2]
for i in inputs[1:]:
dc = ~(i[0] ^ i[1]) & ~i[2]
anyd[0] |= dc
anyi0[0] |= ~i[0] & ~dc
anyf0[0] |= i[1] & ~dc
any_t[0] |= i[2]
any0[0] |= ~i[0] & ~dc & i[1] & ~i[2]
o = outputs[0]
o[0] = (~anyi0[0] | anyd[0]) & ~any0[0] # initial = no_i0 or DC
o[1] = anyf0[0] | anyd[0] # ~final = ~no_f0 or DC
o[2] = any_t[0] & ~(anyd[0] | any0[0]) # toggle = anyT and noDC and no0
logic.bp_and(outputs[0], *inputs)
def or_vd3(self, inputs, outputs):
# 8-valued: o[0]: o[1]: o[2]:
# 0 1 - X R F P N 0 1 - X R F P N 0 1 - X R F P N 0 1 - X R F P N
# 0 0 1 X X R F P N 0 1 1 1 0 1 0 1 1 0 1 1 0 1 1 0 0 0 0 0 1 1 1 1
# 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
# - X 1 X X X X X X 1 1 1 1 1 1 1 1 1 0 1 1 1 1 1 1 0 0 0 0 0 0 0 0
# X X 1 X X X X X X 1 1 1 1 1 1 1 1 1 0 1 1 1 1 1 1 0 0 0 0 0 0 0 0
# R R 1 X X R N R R 0 1 1 1 0 1 0 0 0 0 1 1 0 0 0 0 1 0 0 0 1 1 1 1
# F F 1 X X N F F F 1 1 1 1 1 1 1 1 1 0 1 1 0 1 1 1 1 0 0 0 1 1 1 1
# P P 1 X X R F P N 0 1 1 1 0 1 0 1 1 0 1 1 0 1 1 0 1 0 0 0 1 1 1 1
# N N 1 X X R F N N 1 1 1 1 0 1 1 1 0 0 1 1 0 1 0 0 1 0 0 0 1 1 1 1
i = inputs[0]
anyi1 = self.tmp[0]
anyf1 = self.tmp[1]
anyd = self.tmp[2]
any1 = self.tmp[3]
any_t = self.tmp[4]
anyd[0] = ~(i[0] ^ i[1]) & ~i[2]
anyi1[0] = i[0] & ~anyd[0]
anyf1[0] = ~i[1] & ~anyd[0]
any_t[0] = i[2]
any1[0] = (anyi1[0] & anyf1[0]) & ~i[2]
for i in inputs[1:]:
dc = ~(i[0] ^ i[1]) & ~i[2]
anyd[0] |= dc
anyi1[0] |= i[0] & ~dc
anyf1[0] |= ~i[1] & ~dc
any_t[0] |= i[2]
any1[0] |= i[0] & ~dc & ~i[1] & ~i[2]
o = outputs[0]
o[0] = anyi1[0] | anyd[0] # initial = i1 or DC
o[1] = (~anyf1[0] | anyd[0]) & ~any1[0] # ~final = f1 or DC
o[2] = any_t[0] & ~(anyd[0] | any1[0]) # toggle = anyT and no(DC or 1)
logic.bp_or(outputs[0], *inputs)
def xor_vd3(self, inputs, outputs):
# 8-valued: o[0]: o[1]: o[2]:
# 0 1 - X R F P N 0 1 - X R F P N 0 1 - X R F P N 0 1 - X R F P N
# 0 0 1 X X R F P N 0 1 1 1 0 1 0 1 1 0 1 1 0 1 1 0 0 0 0 0 1 1 1 1
# 1 1 0 X X F R N P 1 0 1 1 1 0 1 0 0 1 1 1 1 0 0 1 0 0 0 0 1 1 1 1
# - X X X X X X X X 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 0 0 0 0 0 0 0 0
# X X X X X X X X X 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 0 0 0 0 0 0 0 0
# R R F X X P N R F 0 1 1 1 0 1 0 1 0 1 1 1 1 0 0 1 1 1 0 0 1 1 1 1
# F F R X X N P F R 1 0 1 1 1 0 1 0 1 0 1 1 0 1 1 0 1 1 0 0 1 1 1 1
# P P N X X R F P N 0 1 1 1 0 1 0 1 1 0 1 1 0 1 1 0 1 1 0 0 1 1 1 1
# N N P X X F R N P 1 0 1 1 1 0 1 0 0 1 1 1 1 0 0 1 1 1 0 0 1 1 1 1
i = inputs[0]
odd0 = self.tmp[0]
odd1 = self.tmp[1]
anyd = self.tmp[2]
anyt = self.tmp[3]
odd0[0] = i[0]
odd1[0] = i[1]
anyd[0] = ~(i[0] ^ i[1]) & ~i[2]
anyt[0] = i[2]
for i in inputs[1:]:
odd0[0] ^= i[0]
odd1[0] ^= i[1]
anyd[0] |= ~(i[0] ^ i[1]) & ~i[2]
anyt[0] |= i[2]
o = outputs[0]
o[0] = odd0[0] | anyd[0]
o[1] = ~odd1[0] | anyd[0]
o[2] = anyt[0] & ~anyd[0]
logic.bp_xor(outputs[0], *inputs)
def sdff_vd3(self, inputs, outputs):
self.dff_vd3(inputs, outputs)
if len(outputs) > 1:
i = inputs[0]
dc = self.tmp[0]
dc[0] = ~(i[0] ^ i[1]) & ~i[2]
outputs[1][0] = ~i[0] | dc[0] # value = 1 or DC
outputs[1][1] = ~i[1] | dc[0] # value = 1 or DC
outputs[1][2] = i[2] # toggle = T
logic.bp_not(outputs[1], inputs[0])
def dff_vd3(self, inputs, outputs):
i = inputs[0]
dc = self.tmp[0]
dc[0] = ~(i[0] ^ i[1]) & ~i[2]
outputs[0][0] = i[0] | dc[0] # value = 1 or DC
outputs[0][1] = i[1] | dc[0] # value = 1 or DC
outputs[0][2] = i[2] # toggle = T
logic.bp_buf(outputs[0], inputs[0])
def nand_vd3(self, inputs, outputs):
self.and_vd3(inputs, outputs)


@ -1,299 +0,0 @@ @@ -1,299 +0,0 @@
import numpy as np
from .bittools import popcount, bit_in
class PackedVectors:
def __init__(self, nvectors=8, width=1, vdim=1, from_cache=None):
if from_cache is not None:
self.bits = np.array(from_cache)
self.width, self.vdim, nbytes = self.bits.shape
self.bits = np.zeros((width, vdim, (nvectors - 1) // 8 + 1), dtype='uint8')
self.vdim = vdim
self.width = width
self.nvectors = nvectors
m1 = np.array([2 ** x for x in range(7, -1, -1)], dtype='uint8')
m0 = ~m1
self.mask = np.rollaxis(np.vstack((m0, m1)), 1)
def from_pair(cls, init, final):
assert init.nvectors == final.nvectors
assert len(init.bits) == len(final.bits)
init_v = init.bits[:, 0]
if init.vdim == 3:
init_c = (init.bits[:, 0] ^ init.bits[:, 1]) | init.bits[:, 2]
elif init.vdim == 2:
init_c = init.bits[:, 1]
init_c = ~np.zeros_like(init.bits[:, 0])
final_v = final.bits[:, 0]
if final.vdim == 3:
final_c = (final.bits[:, 0] ^ final.bits[:, 1]) | final.bits[:, 2]
final_v = ~final.bits[:, 1]
elif final.vdim == 2:
final_c = final.bits[:, 1]
final_c = ~np.zeros_like(final.bits[:, 0])
c = init_c & final_c
a0 = init_v & c
a1 = ~final_v & c
a2 = (init_v ^ final_v) & c
p = PackedVectors(init.nvectors, len(init.bits), 3)
p.bits[:, 0] = a0
p.bits[:, 1] = a1
p.bits[:, 2] = a2
return p
def transition_vectors(self):
a = PackedVectors(self.nvectors-1, self.width, 3)
for pos in range(self.width):
for vidx in range(self.nvectors-1):
tr = self.get_value(vidx, pos) + self.get_value(vidx+1, pos)
if tr == '00':
a.set_value(vidx, pos, '0')
elif tr == '11':
a.set_value(vidx, pos, '1')
elif tr == '01':
a.set_value(vidx, pos, 'R')
elif tr == '10':
a.set_value(vidx, pos, 'F')
elif tr == '--':
a.set_value(vidx, pos, '-')
a.set_value(vidx, pos, 'X')
return a
def __add__(self, other):
a = PackedVectors(self.nvectors + other.nvectors, self.width, max(self.vdim, other.vdim))
# a.bits[:self.bits.shape[0], 0] = self.bits[:, 0]
# if self.vdim == 2:
# a.bits[:self.bits.shape[0], 1] = self.care_bits
# elif self.vdim == 3:
# a.bits[:self.bits.shape[0], 1] = ~self.value_bits
# a.bits[:self.bits.shape[0], 2] = self.toggle_bits
for i in range(self.nvectors):
a[i] = self[i]
for i in range(len(other)):
a[self.nvectors+i] = other[i]
return a
def __len__(self):
return self.nvectors
def randomize(self, one_probability=0.5):
for data in self.bits:
data[0] = np.packbits((np.random.rand(self.nvectors) < one_probability).astype(int))
if self.vdim == 2:
data[1] = 255
elif self.vdim == 3:
data[1] = ~np.packbits((np.random.rand(self.nvectors) < one_probability).astype(int))
data[2] = data[0] ^ ~data[1]
def copy(self, selection_mask=None):
if selection_mask is not None:
cpy = PackedVectors(popcount(selection_mask), len(self.bits), self.vdim)
cur = 0
for vidx in range(self.nvectors):
if bit_in(selection_mask, vidx):
cpy[cur] = self[vidx]
cur += 1
cpy = PackedVectors(self.nvectors, len(self.bits), self.vdim)
np.copyto(cpy.bits, self.bits)
return cpy
def care_bits(self):
if self.vdim == 1:
return self.bits[:, 0] | 255
elif self.vdim == 2:
return self.bits[:, 1]
elif self.vdim == 3:
return (self.bits[:, 0] ^ self.bits[:, 1]) | self.bits[:, 2]
def initial_bits(self):
return self.bits[:, 0]
def value_bits(self):
if self.vdim == 3:
return ~self.bits[:, 1]
return self.bits[:, 0]
def toggle_bits(self):
if self.vdim == 3:
return self.bits[:, 2]
return self.bits[:, 0] & 0
def get_value(self, vector, position):
if vector >= self.nvectors:
raise IndexError(f'vector out of range: {vector} >= {self.nvectors}')
a = self.bits[position, :, vector // 8]
m = self.mask[vector % 8]
if self.vdim == 1:
return '1' if a[0] & m[1] else '0'
elif self.vdim == 2:
if a[0] & m[1]:
return '1' if a[1] & m[1] else 'X'
return '0' if a[1] & m[1] else '-'
elif self.vdim == 3:
if a[2] & m[1]:
if a[0] & m[1]:
return 'F' if a[1] & m[1] else 'N'
return 'P' if a[1] & m[1] else 'R'
if a[0] & m[1]:
return 'X' if a[1] & m[1] else '1'
return '0' if a[1] & m[1] else '-'
def get_values_for_position(self, position):
return ''.join(self.get_value(x, position) for x in range(self.nvectors))
def set_value(self, vector, position, v):
if vector >= self.nvectors:
raise IndexError(f'vector out of range: {vector} >= {self.nvectors}')
a = self.bits[position, :, vector // 8]
m = self.mask[vector % 8]
if self.vdim == 1:
self._set_value_vd1(a, m, v)
elif self.vdim == 2:
self._set_value_vd2(a, m, v)
elif self.vdim == 3:
self._set_value_vd3(a, m, v)
def set_values(self, vector, v, mapping=None, inversions=None):
if vector >= self.nvectors:
raise IndexError(f'vector out of range: {vector} >= {self.nvectors}')
if not mapping:
mapping = [y for y in range(len(v))]
if inversions is None:
inversions = [False] * len(v)
for i, c in enumerate(v):
if inversions[i]:
if c == '1':
c = '0'
elif c == '0':
c = '1'
elif c == 'H':
c = 'L'
elif c == 'L':
c = 'H'
elif c == 'R':
c = 'F'
elif c == 'F':
c = 'R'
self.set_value(vector, mapping[i], c)
def set_values_for_position(self, position, values):
for i, v in enumerate(values):
self.set_value(i, position, v)
def __setitem__(self, vector, value):
for i, c in enumerate(value):
self.set_value(vector, i, c)
def __getitem__(self, vector):
if isinstance(vector, slice):
first = self.get_values_for_position(0)[vector]
ret = PackedVectors(len(first), self.width, self.vdim)
ret.set_values_for_position(0, first)
for pos in range(1, self.width):
ret.set_values_for_position(pos, self.get_values_for_position(pos)[vector])
return ret
return ''.join(self.get_value(vector, pos) for pos in range(len(self.bits)))
def _set_value_vd1(a, m, v):
if v in [True, 1, '1', 'H', 'h']:
a[0] |= m[1]
a[0] &= m[0]
def _set_value_vd2(a, m, v):
if v in [True, 1, '1', 'H', 'h']:
a[0] |= m[1]
a[1] |= m[1]
elif v in [False, 0, '0', 'L', 'l']:
a[0] &= m[0]
a[1] |= m[1]
elif v in ['X', 'x']:
a[0] |= m[1]
a[1] &= m[0]
a[0] &= m[0]
a[1] &= m[0]
# i fb act
# a 0 1 2
# - 0 0 0 None, '-'
# 0 0 1 0 False, 0, '0', 'l', 'L'
# 1 1 0 0 True, 1, '1', 'h', 'H'
# X 1 1 0 'x', 'X'
# / 0 0 1 '/', 'r', 'R'
# ^ 0 1 1 '^', 'p', 'P'
# v 1 0 1 'v', 'n', 'N'
# \ 1 1 1 '\', 'f', 'F'
def _set_value_vd3(a, m, v):
if v in [False, 0, '0', 'L', 'l']:
a[0] &= m[0]
a[1] |= m[1]
a[2] &= m[0]
elif v in [True, 1, '1', 'H', 'h']:
a[0] |= m[1]
a[1] &= m[0]
a[2] &= m[0]
elif v in ['X', 'x']:
a[0] |= m[1]
a[1] |= m[1]
a[2] &= m[0]
elif v in ['/', 'r', 'R']:
a[0] &= m[0]
a[1] &= m[0]
a[2] |= m[1]
elif v in ['^', 'p', 'P']:
a[0] &= m[0]
a[1] |= m[1]
a[2] |= m[1]
elif v in ['v', 'n', 'N']:
a[0] |= m[1]
a[1] &= m[0]
a[2] |= m[1]
elif v in ['\\', 'f', 'F']:
a[0] |= m[1]
a[1] |= m[1]
a[2] |= m[1]
a[0] &= m[0]
a[1] &= m[0]
a[2] &= m[0]
def __repr__(self):
return f'<PackedVectors nvectors={self.nvectors}, width={self.width}, vdim={self.vdim}>'
def __str__(self):
lst = []
for p in range(self.nvectors):
lst.append(''.join(self.get_value(p, w) for w in range(len(self.bits))))
if len(lst) == 0: return ''
if len(lst[0]) > 64:
lst = [s[:32] + '...' + s[-32:] for s in lst]
if len(lst) <= 16:
return '\n'.join(lst)
return '\n'.join(lst[:8]) + '\n...\n' + '\n'.join(lst[-8:])
def diff(self, other, out=None):
if out is None:
out = np.zeros((self.width, self.bits.shape[-1]), dtype='uint8')
out[...] = (self.value_bits ^ other.value_bits) & self.care_bits & other.care_bits
return out


@ -1,14 +1,28 @@ @@ -1,14 +1,28 @@
"""A simple and incomplete parser for the Standard Delay Format (SDF).
The main purpose of this parser is to extract pin-to-pin delay and interconnect delay information from SDF files.
Sophisticated timing specifications (timing checks, conditional delays, etc.) are currently not supported.
The functions :py:func:`load` and :py:func:`read` return an intermediate representation (:class:`DelayFile` object).
Call :py:func:`DelayFile.annotation` to match the intermediate representation to a given circuit.
from collections import namedtuple
import numpy as np
from lark import Lark, Transformer
from collections import namedtuple
from . import log
import gzip
from . import log, readtext
Interconnect = namedtuple('Interconnect', ['orig', 'dest', 'r', 'f'])
IOPath = namedtuple('IOPath', ['ipin', 'opin', 'r', 'f'])
class DelayFile:
"""An intermediate representation of an SDF file.
def __init__(self, name, cells): = name
if None in cells:
@ -22,26 +36,26 @@ class DelayFile: @@ -22,26 +36,26 @@ class DelayFile:
'\n'.join(str(i) for i in self.interconnects)
def annotation(self, circuit, pin_index_f, dataset=1, interconnect=True, ffdelays=True):
Constructs an 3-dimensional array with timing data for each line in `circuit`.
Dimension 1 of the returned array is the line index.
Dimension 2 is the type of timing data: 0:`delay`, 1:`pulse rejection limit`.
Dimension 3 is the polarity at the output of the reading node: 0:`rising`, 1:`falling`.
The polarity for pulse rejection is determined by the latter transition of the pulse.
E.g., timing[42,1,0] is the rejection limit of a negative pulse at the output of the reader of line 42.
"""Constructs an 3-dimensional ndarray with timing data for each line in ``circuit``.
An IOPATH delay for a node is annotated to the line connected to the input pin specified in the IOPATH.
Currently, only ABSOLUTE IOPATH and INTERCONNECT delays are supported.
Pulse rejection limits are derived from absolute delays, explicit declarations (PATHPULSE etc.) are ignored.
:param circuit:
:param pin_index_f:
:param ffdelays:
:param interconnect:
:param pin_index_f:
:param circuit:
:type dataset: int or tuple
:return: A 3-dimensional ndarray with timing data.
* Axis 0: line index.
* Axis 1: type of timing data: 0=`delay`, 1=`pulse rejection limit`.
* Axis 2: The polarity of the output transition of the reading node: 0=`rising`, 1=`falling`.
The polarity for pulse rejection is determined by the latter transition of the pulse.
E.g., timing[42,1,0] is the rejection limit of a negative pulse at the output of the reader of line 42.
def select_del(_delvals, idx):
if type(dataset) is tuple:
@ -170,7 +184,6 @@ class SdfTransformer(Transformer): @@ -170,7 +184,6 @@ class SdfTransformer(Transformer):
return DelayFile(name, cells)
def parse(sdf):
grammar = r"""
start: "(DELAYFILE" ( "(SDFVERSION" _NOB ")"
| "(DESIGN" "\"" NAME "\"" ")"
@ -201,13 +214,16 @@ def parse(sdf): @@ -201,13 +214,16 @@ def parse(sdf):
%ignore ( /\r?\n/ | COMMENT )+
%ignore /[\t\f ]+/
if '\n' not in str(sdf): # One line?: Assuming it is a file name.
if str(sdf).endswith('.gz'):
with, 'rt') as f:
text =
with open(sdf, 'r') as f:
text =
text = str(sdf)
def parse(text):
"""Parses the given ``text`` and returns a :class:`DelayFile` object."""
return Lark(grammar, parser="lalr", transformer=SdfTransformer()).parse(text)
def load(file):
"""Parses the contents of ``file`` and returns a :class:`DelayFile` object.
The given file may be gzip compressed.
return parse(readtext(file))


@ -1,8 +1,19 @@ @@ -1,8 +1,19 @@
from lark import Lark, Transformer
from collections import namedtuple
"""A simple and incomplete parser for the Standard Test Interface Language (STIL).
The main purpose of this parser is to load scan pattern sets from STIL files.
It supports only a very limited subset of STIL.
The functions :py:func:`load` and :py:func:`read` return an intermediate representation (:class:`StilFile` object).
Call :py:func:`StilFile.tests4v`, :py:func:`StilFile.tests8v`, or :py:func:`StilFile.responses4v` to
obtain the appropriate vector sets.
import re
import gzip
from .packed_vectors import PackedVectors
from collections import namedtuple
from lark import Lark, Transformer
from . import readtext, logic
from .logic_sim import LogicSim
@ -11,6 +22,8 @@ ScanPattern = namedtuple('ScanPattern', ['load', 'launch', 'capture', 'unload']) @@ -11,6 +22,8 @@ ScanPattern = namedtuple('ScanPattern', ['load', 'launch', 'capture', 'unload'])
class StilFile:
"""An intermediate representation of a STIL file.
def __init__(self, version, signal_groups, scan_chains, calls):
self.version = version
self.signal_groups = signal_groups
@ -21,7 +34,7 @@ class StilFile: @@ -21,7 +34,7 @@ class StilFile:
self.patterns = []
launch = {}
capture = {}
load = {}
sload = {}
for call in self.calls:
if == 'load_unload':
unload = {}
@ -29,13 +42,13 @@ class StilFile: @@ -29,13 +42,13 @@ class StilFile:
if so_port in call.parameters:
unload[so_port] = call.parameters[so_port].replace('\n', '')
if len(launch) > 0:
self.patterns.append(ScanPattern(load, launch, capture, unload))
self.patterns.append(ScanPattern(sload, launch, capture, unload))
capture = {}
launch = {}
load = {}
sload = {}
for si_port in self.si_ports:
if si_port in call.parameters:
load[si_port] = call.parameters[si_port].replace('\n', '')
sload[si_port] = call.parameters[si_port].replace('\n', '')
if'_launch') or'_capture'):
if len(launch) == 0:
launch = dict((k, v.replace('\n', '')) for k, v in call.parameters.items())
@ -73,48 +86,69 @@ class StilFile: @@ -73,48 +86,69 @@ class StilFile:
scan_inversions[chain[-1]] = scan_out_inversion
return interface, pi_map, po_map, scan_maps, scan_inversions
def tests(self, c):
interface, pi_map, po_map, scan_maps, scan_inversions = self._maps(c)
tests = PackedVectors(len(self.patterns), len(interface), 2)
def tests(self, circuit):
"""Assembles and returns a scan test pattern set for given circuit.
This function assumes a static (stuck-at fault) test.
interface, pi_map, po_map, scan_maps, scan_inversions = self._maps(circuit)
tests = logic.MVArray((len(interface), len(self.patterns)))
for i, p in enumerate(self.patterns):
for si_port in self.si_ports.keys():
tests.set_values(i, p.load[si_port], scan_maps[si_port], scan_inversions[si_port])
tests.set_values(i, p.launch['_pi'], pi_map)
pattern = logic.mv_xor(p.load[si_port], scan_inversions[si_port])[scan_maps[si_port], i] =[:, 0][pi_map, i] = logic.MVArray(p.launch['_pi']).data[:, 0]
return tests
def tests8v(self, c):
interface, pi_map, po_map, scan_maps, scan_inversions = self._maps(c)
init = PackedVectors(len(self.patterns), len(interface), 2)
def tests_loc(self, circuit):
"""Assembles and returns a LoC scan test pattern set for given circuit.
This function assumes a launch-on-capture (LoC) delay test.
It performs a logic simulation to obtain the first capture pattern (the one that launches the
delay test) and assembles the test pattern set from from pairs for initialization- and launch-patterns.
interface, pi_map, po_map, scan_maps, scan_inversions = self._maps(circuit)
init = logic.MVArray((len(interface), len(self.patterns)), m=4)
# init = PackedVectors(len(self.patterns), len(interface), 2)
for i, p in enumerate(self.patterns):
# init.set_values(i, '0' * len(interface))
for si_port in self.si_ports.keys():
init.set_values(i, p.load[si_port], scan_maps[si_port], scan_inversions[si_port])
init.set_values(i, p.launch['_pi'], pi_map)
sim4v = LogicSim(c, len(init), 2)
pattern = logic.mv_xor(p.load[si_port], scan_inversions[si_port])[scan_maps[si_port], i] =[:, 0][pi_map, i] = logic.MVArray(p.launch['_pi']).data[:, 0]
launch_bp = logic.BPArray(init)
sim4v = LogicSim(circuit, len(init), m=4)
launch = init.copy()
launch = logic.MVArray(launch_bp)
for i, p in enumerate(self.patterns):
# if there was no launch clock, then init = launch
if ('P' not in p.launch['_pi']) or ('P' not in p.capture['_pi']):
for si_port in self.si_ports.keys():
launch.set_values(i, p.load[si_port], scan_maps[si_port], scan_inversions[si_port])
pattern = logic.mv_xor(p.load[si_port], scan_inversions[si_port])[scan_maps[si_port], i] =[:, 0]
if '_pi' in p.capture and 'P' in p.capture['_pi']:
launch.set_values(i, p.capture['_pi'], pi_map)[pi_map, i] = logic.MVArray(p.capture['_pi']).data[:, 0][po_map, i] = logic.UNASSIGNED
return PackedVectors.from_pair(init, launch)
return logic.mv_transition(init, launch)
def responses(self, c):
interface, pi_map, po_map, scan_maps, scan_inversions = self._maps(c)
resp = PackedVectors(len(self.patterns), len(interface), 2)
def responses(self, circuit):
"""Assembles and returns a scan test response pattern set for given circuit."""
interface, pi_map, po_map, scan_maps, scan_inversions = self._maps(circuit)
resp = logic.MVArray((len(interface), len(self.patterns)))
# resp = PackedVectors(len(self.patterns), len(interface), 2)
for i, p in enumerate(self.patterns):
if len(p.capture) > 0:
resp.set_values(i, p.capture['_po'], po_map)
resp.set_values(i, p.launch['_po'], po_map)[po_map, i] = logic.MVArray(p.capture['_po'] if len(p.capture) > 0 else p.launch['_po']).data[:, 0]
# if len(p.capture) > 0:
# resp.set_values(i, p.capture['_po'], po_map)
# else:
# resp.set_values(i, p.launch['_po'], po_map)
for so_port in self.so_ports.keys():
resp.set_values(i, p.unload[so_port], scan_maps[so_port], scan_inversions[so_port])
pattern = logic.mv_xor(p.unload[so_port], scan_inversions[so_port])[scan_maps[so_port], i] =[:, 0]
# resp.set_values(i, p.unload[so_port], scan_maps[so_port], scan_inversions[so_port])
return resp
@ -162,7 +196,6 @@ class StilTransformer(Transformer): @@ -162,7 +196,6 @@ class StilTransformer(Transformer):
return StilFile(float(args[0]), self._signal_groups, self._scan_chains, self._calls)
def parse(stil):
grammar = r"""
start: "STIL" FLOAT _ignore _block*
_block: signal_groups | scan_structures | pattern
@ -203,50 +236,16 @@ def parse(stil): @@ -203,50 +236,16 @@ def parse(stil):
_NOB: /[^{}]+/
%ignore ( /\r?\n/ | "//" /[^\n]*/ | /[\t\f ]/ )+
if '\n' not in str(stil): # One line?: Assuming it is a file name.
if str(stil).endswith('.gz'):
with, 'rt') as f:
text =
with open(stil, 'r') as f:
text =
text = str(stil)
return Lark(grammar, parser="lalr", transformer=StilTransformer()).parse(text)
def extract_scan_pattens(stil_calls):
pats = []
pi = None
scan_in = None
for call in stil_calls:
if == 'load_unload':
scan_out = call.parameters.get('Scan_Out')
if scan_out is not None:
scan_out = scan_out.replace('\n', '')
if pi: pats.append(ScanPattern(scan_in, pi, None, scan_out))
scan_in = call.parameters.get('Scan_In')
if scan_in is not None:
scan_in = scan_in.replace('\n', '')
if == 'allclock_capture':
pi = call.parameters['_pi'].replace('\n', '')
return pats
def match_patterns(stil_file, pats, interface):
intf_pos = dict([(, i) for i, n in enumerate(interface)])
pi_map = [intf_pos[n] for n in stil_file.signal_groups['_pi']]
scan_map = [intf_pos[re.sub(r'b..\.', '', n)] for n in reversed(stil_file.scan_chains['1'])]
# print(scan_map)
tests = PackedVectors(len(pats), len(interface), 2)
for i, p in enumerate(pats):
tests.set_values(i, p.scan_in, scan_map)
tests.set_values(i, p.pi, pi_map)
def parse(text):
"""Parses the given ``text`` and returns a :class:`StilFile` object."""
return Lark(grammar, parser="lalr", transformer=StilTransformer()).parse(text)
resp = PackedVectors(len(pats), len(interface), 2)
for i, p in enumerate(pats):
resp.set_values(i, p.pi, pi_map)
resp.set_values(i, p.scan_out, scan_map)
return tests, resp
def load(file):
"""Parses the contents of ``file`` and returns a :class:`StilFile` object.
The given file may be gzip compressed.
return parse(readtext(file))


@ -1,8 +1,14 @@ @@ -1,8 +1,14 @@
"""A simple and incomplete parser for Verilog files.
The main purpose of this parser is to load synthesized, non-hierarchical (flat) gate-level netlists.
It supports only a very limited subset of Verilog.
from collections import namedtuple
import gzip
from lark import Lark, Transformer
from . import readtext
from .circuit import Circuit, Node, Line
from .saed import pin_index, pin_is_output
@ -152,22 +158,21 @@ grammar = """ @@ -152,22 +158,21 @@ grammar = """
def loads(s, *, branchforks=False):
return Lark(grammar, parser="lalr", transformer=VerilogTransformer(branchforks)).parse(s)
def parse(text, *, branchforks=False):
"""Parses the given ``text`` as Verilog code.
:param text: A string with Verilog code.
:param branchforks: If set to ``True``, the returned circuit will include additional `forks` on each fanout branch.
These forks are needed to correctly annotate interconnect delays
(see :py:func:`kyupy.sdf.DelayFile.annotation`).
:return: A :class:`~kyupy.circuit.Circuit` object.
return Lark(grammar, parser="lalr", transformer=VerilogTransformer(branchforks)).parse(text)
def load(fp, *, branchforks=False):
return loads(, branchforks=branchforks)
def load(file, *args, **kwargs):
"""Parses the contents of ``file`` as Verilog code.
def parse(verilog, branchforks=False):
if '\n' not in str(verilog): # One line?: Assuming it is a file name.
if str(verilog).endswith('.gz'):
with, 'rt') as f:
text =
with open(verilog, 'r') as f:
text =
text = str(verilog)
return loads(text, branchforks=branchforks)
The given file may be gzip compressed. Takes the same keyword arguments as :py:func:`parse`.
return parse(readtext(file), *args, **kwargs)


@ -1,8 +1,24 @@ @@ -1,8 +1,24 @@
"""High-Throughput combinational logic timing simulators.
These simulators work similarly to :py:class:`kyupy.logic_sim.LogicSim`.
They propagate values through the combinational circuit from (pseudo) primary inputs to (pseudo) primary outputs.
Instead of propagating logic values, these simulators propagate signal histories (waveforms).
They are designed to run many simulations in parallel and while their latencies are quite high, they achieve
high throughput performance.
The simulators are not event-based and are not capable of simulating sequential circuits directly.
Two simulators are available: :py:class:`WaveSim` runs on the CPU, and the derived class
:py:class:`WaveSimCuda` runs on the GPU.
import math
from bisect import bisect, insort_left
import numpy as np
from . import numba
from . import cuda
TMAX = np.float32(2 ** 127) # almost np.PINF for 32-bit floating point values
@ -77,6 +93,7 @@ class Heap: @@ -77,6 +93,7 @@ class Heap:
class WaveSim:
"""A waveform-based combinational logic timing simulator."""
def __init__(self, circuit, timing, sims=8, wavecaps=16, strip_forks=False, keep_waveforms=True):
self.circuit = circuit
self.sims = sims
@ -243,19 +260,24 @@ class WaveSim: @@ -243,19 +260,24 @@ class WaveSim:
self.timing[line, 0, polarity] = delay
def assign(self, vectors, time=0.0, offset=0):
nvectors = min(vectors.nvectors - offset, self.sims)
nvectors = min(len(vectors) - offset, self.sims)
for i, node in enumerate(self.interface):
ppi_loc = self.sat[self.ppi_offset + i, 0]
if ppi_loc < 0: continue
for p in range(nvectors):
vector = p + offset
a = vectors.bits[i, :, vector // 8]
a =[i, :, vector // 8]
m = self.mask[vector % 8]
toggle = 0
if len(a) <= 2:
if a[0] & m[1]:
self.state[ppi_loc, p] = TMIN
toggle += 1
if (len(a) > 2) and (a[2] & m[1]) and ((a[0] & m[1]) == (a[1] & m[1])):
if a[1] & m[1]:
self.state[ppi_loc, p] = TMIN
toggle += 1
if (a[2] & m[1]) and ((a[0] & m[1]) != (a[1] & m[1])):
self.state[ppi_loc + toggle, p] = time
toggle += 1
self.state[ppi_loc + toggle, p] = TMAX
@ -519,3 +541,312 @@ def wave_eval(op, state, sat, st_idx, line_times, sd=0.0, seed=0): @@ -519,3 +541,312 @@ def wave_eval(op, state, sat, st_idx, line_times, sd=0.0, seed=0):
state[z_mem + z_cur, st_idx] = a if a > b else b # propagate overflow flags by storing biggest TMAX from input
return overflows
class WaveSimCuda(WaveSim):
"""A GPU-accelerated waveform-based combinational logic timing simulator."""
def __init__(self, circuit, timing, sims=8, wavecaps=16, strip_forks=False, keep_waveforms=True):
super().__init__(circuit, timing, sims, wavecaps, strip_forks, keep_waveforms)
self.tdata = np.zeros((len(self.interface), 3, (sims - 1) // 8 + 1), dtype='uint8')
self.d_state = cuda.to_device(self.state)
self.d_sat = cuda.to_device(self.sat)
self.d_ops = cuda.to_device(self.ops)
self.d_timing = cuda.to_device(self.timing)
self.d_tdata = cuda.to_device(self.tdata)
self.d_cdata = cuda.to_device(self.cdata)
self._block_dim = (32, 16)
def get_line_delay(self, line, polarity):
return self.d_timing[line, 0, polarity]
def set_line_delay(self, line, polarity, delay):
self.d_timing[line, 0, polarity] = delay
def assign(self, vectors, time=0.0, offset=0):
assert (offset % 8) == 0
byte_offset = offset // 8
assert byte_offset <[-1]
pdim = min([-1] - byte_offset, self.tdata.shape[-1])
self.tdata[..., 0:pdim] =[..., byte_offset:pdim + byte_offset]
if vectors.m == 2:
self.tdata[:, 2, 0:pdim] = 0
cuda.to_device(self.tdata, to=self.d_tdata)
grid_dim = self._grid_dim(self.sims, len(self.interface))
assign_kernel[grid_dim, self._block_dim](self.d_state, self.d_sat, self.ppi_offset,
len(self.interface), self.d_tdata, time)
def _grid_dim(self, x, y):
gx = math.ceil(x / self._block_dim[0])
gy = math.ceil(y / self._block_dim[1])
return gx, gy
def propagate(self, sims=None, sd=0.0, seed=1):
if sims is None:
sims = self.sims
sims = min(sims, self.sims)
for op_start, op_stop in zip(self.level_starts, self.level_stops):
grid_dim = self._grid_dim(sims, op_stop - op_start)
wave_kernel[grid_dim, self._block_dim](self.d_ops, op_start, op_stop, self.d_state, self.sat, int(0),
sims, self.d_timing, sd, seed)
self.lst_eat_valid = False
def wave(self, line, vector):
if line < 0:
return None
mem, wcap, _ = self.sat[line]
if mem < 0:
return None
return self.d_state[mem:mem + wcap, vector]
def capture(self, time=TMAX, sd=0, seed=1, cdata=None, offset=0):
grid_dim = self._grid_dim(self.sims, len(self.interface))
capture_kernel[grid_dim, self._block_dim](self.d_state, self.d_sat, self.ppo_offset,
self.d_cdata, time, sd * math.sqrt(2), seed)
self.cdata[...] = self.d_cdata
if cdata is not None:
assert offset < cdata.shape[1]
cap_dim = min(cdata.shape[1] - offset, self.sims)
cdata[:, offset:cap_dim + offset] = self.cdata[:, 0:cap_dim]
self.lst_eat_valid = True
return self.cdata
def reassign(self, time=0.0):
grid_dim = self._grid_dim(self.sims, len(self.interface))
reassign_kernel[grid_dim, self._block_dim](self.d_state, self.d_sat, self.ppi_offset, self.ppo_offset,
self.d_cdata, time)
def wavecaps(self):
gx = math.ceil(len(self.circuit.lines) / 512)
wavecaps_kernel[gx, 512](self.d_state, self.d_sat, self.sims)
self.sat[...] = self.d_sat
return self.sat[..., 2]
def wavecaps_kernel(state, sat, sims):
idx = cuda.grid(1)
if idx >= len(sat): return
lidx, lcap, _ = sat[idx]
if lidx < 0: return
wcap = 0
for sidx in range(sims):
for tidx in range(lcap):
t = state[lidx + tidx, sidx]
if tidx > wcap:
wcap = tidx
if t >= TMAX: break
sat[idx, 2] = wcap + 1
def reassign_kernel(state, sat, ppi_offset, ppo_offset, cdata, ppi_time):
vector, y = cuda.grid(2)
if vector >= state.shape[-1]: return
if ppo_offset + y >= len(sat): return
ppo, ppo_cap, _ = sat[ppo_offset + y]
ppi, ppi_cap, _ = sat[ppi_offset + y]
if ppo < 0: return
if ppi < 0: return
ppo_val = int(cdata[y, vector, 1])
ppi_val = int(0)
for tidx in range(ppi_cap):
t = state[ppi + tidx, vector]
if t >= TMAX: break
ppi_val ^= 1
# make new waveform at PPI
toggle = 0
if ppi_val:
state[ppi + toggle, vector] = TMIN
toggle += 1
if ppi_val != ppo_val:
state[ppi + toggle, vector] = ppi_time
toggle += 1
state[ppi + toggle, vector] = TMAX
def capture_kernel(state, sat, ppo_offset, cdata, time, s_sqrt2, seed):
x, y = cuda.grid(2)
if ppo_offset + y >= len(sat): return
line, tdim, _ = sat[ppo_offset + y]
if line < 0: return
if x >= state.shape[-1]: return
vector = x
m = 0.5
acc = 0.0
eat = TMAX
lst = TMIN
tog = 0
ovl = 0
val = int(0)
final = int(0)
for tidx in range(tdim):
t = state[line + tidx, vector]
if t >= TMAX:
if t == TMAX_OVL:
ovl = 1
m = -m
final ^= 1
if t < time:
val ^= 1
if t <= TMIN: continue
if s_sqrt2 > 0:
acc += m * (1 + math.erf((t - time) / s_sqrt2))
eat = min(eat, t)
lst = max(lst, t)
tog += 1
if s_sqrt2 > 0:
if m < 0:
acc += 1
if acc >= 0.99:
val = 1
elif acc > 0.01:
seed = (seed << 4) + (vector << 20) + (y << 1)
seed = int(0xDEECE66D) * seed + 0xB
seed = int(0xDEECE66D) * seed + 0xB
rnd = float((seed >> 8) & 0xffffff) / float(1 << 24)
val = rnd < acc
val = 0
acc = val
cdata[y, vector, 0] = acc
cdata[y, vector, 1] = val
cdata[y, vector, 2] = final
cdata[y, vector, 3] = (val != final)
cdata[y, vector, 4] = eat
cdata[y, vector, 5] = lst
cdata[y, vector, 6] = ovl
def assign_kernel(state, sat, ppi_offset, intf_len, tdata, time):
x, y = cuda.grid(2)
if y >= intf_len: return
line = sat[ppi_offset + y, 0]
if line < 0: return
sdim = state.shape[-1]
if x >= sdim: return
vector = x
a0 = tdata[y, 0, vector // 8]
a1 = tdata[y, 1, vector // 8]
a2 = tdata[y, 2, vector // 8]
m = np.uint8(1 << (7 - (vector % 8)))
toggle = 0
if a1 & m:
state[line + toggle, x] = TMIN
toggle += 1
if (a2 & m) and ((a0 & m) != (a1 & m)):
state[line + toggle, x] = time
toggle += 1
state[line + toggle, x] = TMAX
def rand_gauss_dev(seed, sd):
clamp = 0.5
if sd <= 0.0:
return 1.0
while True:
x = -6.0
for i in range(12):
seed = int(0xDEECE66D) * seed + 0xB
x += float((seed >> 8) & 0xffffff) / float(1 << 24)
x *= sd
if abs(x) <= clamp:
return x + 1.0
def wave_kernel(ops, op_start, op_stop, state, sat, st_start, st_stop, line_times, sd, seed):
x, y = cuda.grid(2)
st_idx = st_start + x
op_idx = op_start + y
if st_idx >= st_stop: return
if op_idx >= op_stop: return
lut = ops[op_idx, 0]
z_idx = ops[op_idx, 1]
a_idx = ops[op_idx, 2]
b_idx = ops[op_idx, 3]
overflows = int(0)
_seed = (seed << 4) + (z_idx << 20) + (st_idx << 1)
a_mem = sat[a_idx, 0]
b_mem = sat[b_idx, 0]
z_mem, z_cap, _ = sat[z_idx]
a_cur = int(0)
b_cur = int(0)
z_cur = lut & 1
if z_cur == 1:
state[z_mem, st_idx] = TMIN
a = state[a_mem, st_idx] + line_times[a_idx, 0, z_cur] * rand_gauss_dev(_seed ^ a_mem ^ z_cur, sd)
b = state[b_mem, st_idx] + line_times[b_idx, 0, z_cur] * rand_gauss_dev(_seed ^ b_mem ^ z_cur, sd)
previous_t = TMIN
current_t = min(a, b)
inputs = int(0)
while current_t < TMAX:
z_val = z_cur & 1
if b < a:
b_cur += 1
b = state[b_mem + b_cur, st_idx]
b += line_times[b_idx, 0, z_val ^ 1] * rand_gauss_dev(_seed ^ b_mem ^ z_val ^ 1, sd)
thresh = line_times[b_idx, 1, z_val] * rand_gauss_dev(_seed ^ b_mem ^ z_val, sd)
inputs ^= 2
next_t = b
a_cur += 1
a = state[a_mem + a_cur, st_idx]
a += line_times[a_idx, 0, z_val ^ 1] * rand_gauss_dev(_seed ^ a_mem ^ z_val ^ 1, sd)
thresh = line_times[a_idx, 1, z_val] * rand_gauss_dev(_seed ^ a_mem ^ z_val, sd)
inputs ^= 1
next_t = a
if (z_cur & 1) != ((lut >> inputs) & 1):
# we generate a toggle in z_mem, if:
# ( it is the first toggle in z_mem OR
# following toggle is earlier OR
# pulse is wide enough ) AND enough space in z_mem.
if z_cur == 0 or next_t < current_t or (current_t - previous_t) > thresh:
if z_cur < (z_cap - 1):
state[z_mem + z_cur, st_idx] = current_t
previous_t = current_t
z_cur += 1
overflows += 1
previous_t = state[z_mem + z_cur - 1, st_idx]
z_cur -= 1
z_cur -= 1
if z_cur > 0:
previous_t = state[z_mem + z_cur - 1, st_idx]
previous_t = TMIN
current_t = min(a, b)
if overflows > 0:
state[z_mem + z_cur, st_idx] = TMAX_OVL
state[z_mem + z_cur, st_idx] = a if a > b else b # propagate overflow flags by storing biggest TMAX from input


@ -1,317 +0,0 @@ @@ -1,317 +0,0 @@
import numpy as np
import math
from .wave_sim import WaveSim
from . import cuda
TMAX = np.float32(2 ** 127) # almost np.PINF for 32-bit floating point values
TMAX_OVL = np.float32(1.1 * 2 ** 127) # almost np.PINF with overflow mark
TMIN = np.float32(-2 ** 127) # almost np.NINF for 32-bit floating point values
class WaveSimCuda(WaveSim):
def __init__(self, circuit, timing, sims=8, wavecaps=16, strip_forks=False, keep_waveforms=True):
super().__init__(circuit, timing, sims, wavecaps, strip_forks, keep_waveforms)
self.tdata = np.zeros((len(self.interface), 3, (sims - 1) // 8 + 1), dtype='uint8')
self.d_state = cuda.to_device(self.state)
self.d_sat = cuda.to_device(self.sat)
self.d_ops = cuda.to_device(self.ops)
self.d_timing = cuda.to_device(self.timing)
self.d_tdata = cuda.to_device(self.tdata)
self.d_cdata = cuda.to_device(self.cdata)
self._block_dim = (32, 16)
def get_line_delay(self, line, polarity):
return self.d_timing[line, 0, polarity]
def set_line_delay(self, line, polarity, delay):
self.d_timing[line, 0, polarity] = delay
def assign(self, vectors, time=0.0, offset=0):
assert (offset % 8) == 0
byte_offset = offset // 8
assert byte_offset < vectors.bits.shape[-1]
pdim = min(vectors.bits.shape[-1] - byte_offset, self.tdata.shape[-1])
self.tdata[..., 0:pdim] = vectors.bits[..., byte_offset:pdim + byte_offset]
if vectors.vdim == 1:
self.tdata[:, 1, 0:pdim] = ~self.tdata[:, 1, 0:pdim]
self.tdata[:, 2, 0:pdim] = 0
cuda.to_device(self.tdata, to=self.d_tdata)
grid_dim = self._grid_dim(self.sims, len(self.interface))
assign_kernel[grid_dim, self._block_dim](self.d_state, self.d_sat, self.ppi_offset,
len(self.interface), self.d_tdata, time)
def _grid_dim(self, x, y):
gx = math.ceil(x / self._block_dim[0])
gy = math.ceil(y / self._block_dim[1])
return gx, gy
def propagate(self, sims=None, sd=0.0, seed=1):
if sims is None:
sims = self.sims
sims = min(sims, self.sims)
for op_start, op_stop in zip(self.level_starts, self.level_stops):
grid_dim = self._grid_dim(sims, op_stop - op_start)
wave_kernel[grid_dim, self._block_dim](self.d_ops, op_start, op_stop, self.d_state, self.sat, int(0),
sims, self.d_timing, sd, seed)
self.lst_eat_valid = False
def wave(self, line, vector):
if line < 0:
return None
mem, wcap, _ = self.sat[line]
if mem < 0:
return None
return self.d_state[mem:mem + wcap, vector]
def capture(self, time=TMAX, sd=0, seed=1, cdata=None, offset=0):
grid_dim = self._grid_dim(self.sims, len(self.interface))
capture_kernel[grid_dim, self._block_dim](self.d_state, self.d_sat, self.ppo_offset,
self.d_cdata, time, sd * math.sqrt(2), seed)
self.cdata[...] = self.d_cdata
if cdata is not None:
assert offset < cdata.shape[1]
cap_dim = min(cdata.shape[1] - offset, self.sims)
cdata[:, offset:cap_dim + offset] = self.cdata[:, 0:cap_dim]
self.lst_eat_valid = True
return self.cdata
def reassign(self, time=0.0):
grid_dim = self._grid_dim(self.sims, len(self.interface))
reassign_kernel[grid_dim, self._block_dim](self.d_state, self.d_sat, self.ppi_offset, self.ppo_offset,
self.d_cdata, time)
def wavecaps(self):
gx = math.ceil(len(self.circuit.lines) / 512)
wavecaps_kernel[gx, 512](self.d_state, self.d_sat, self.sims)
self.sat[...] = self.d_sat
return self.sat[..., 2]
def wavecaps_kernel(state, sat, sims):
idx = cuda.grid(1)
if idx >= len(sat): return
lidx, lcap, _ = sat[idx]
if lidx < 0: return
wcap = 0
for sidx in range(sims):
for tidx in range(lcap):
t = state[lidx + tidx, sidx]
if tidx > wcap:
wcap = tidx
if t >= TMAX: break
sat[idx, 2] = wcap + 1
def reassign_kernel(state, sat, ppi_offset, ppo_offset, cdata, ppi_time):
vector, y = cuda.grid(2)
if vector >= state.shape[-1]: return
if ppo_offset + y >= len(sat): return
ppo, ppo_cap, _ = sat[ppo_offset + y]
ppi, ppi_cap, _ = sat[ppi_offset + y]
if ppo < 0: return
if ppi < 0: return
ppo_val = int(cdata[y, vector, 1])
ppi_val = int(0)
for tidx in range(ppi_cap):
t = state[ppi + tidx, vector]
if t >= TMAX: break
ppi_val ^= 1
# make new waveform at PPI
toggle = 0
if ppi_val:
state[ppi + toggle, vector] = TMIN
toggle += 1
if ppi_val != ppo_val:
state[ppi + toggle, vector] = ppi_time
toggle += 1
state[ppi + toggle, vector] = TMAX
def capture_kernel(state, sat, ppo_offset, cdata, time, s_sqrt2, seed):
x, y = cuda.grid(2)
if ppo_offset + y >= len(sat): return
line, tdim, _ = sat[ppo_offset + y]
if line < 0: return
if x >= state.shape[-1]: return
vector = x
m = 0.5
acc = 0.0
eat = TMAX
lst = TMIN
tog = 0
ovl = 0
val = int(0)
final = int(0)
for tidx in range(tdim):
t = state[line + tidx, vector]
if t >= TMAX:
if t == TMAX_OVL:
ovl = 1
m = -m
final ^= 1
if t < time:
val ^= 1
if t <= TMIN: continue
if s_sqrt2 > 0:
acc += m * (1 + math.erf((t - time) / s_sqrt2))
eat = min(eat, t)
lst = max(lst, t)
tog += 1
if s_sqrt2 > 0:
if m < 0:
acc += 1
if acc >= 0.99:
val = 1
elif acc > 0.01:
seed = (seed << 4) + (vector << 20) + (y << 1)
seed = int(0xDEECE66D) * seed + 0xB
seed = int(0xDEECE66D) * seed + 0xB
rnd = float((seed >> 8) & 0xffffff) / float(1 << 24)
val = rnd < acc
val = 0
acc = val
cdata[y, vector, 0] = acc
cdata[y, vector, 1] = val
cdata[y, vector, 2] = final
cdata[y, vector, 3] = (val != final)
cdata[y, vector, 4] = eat
cdata[y, vector, 5] = lst
cdata[y, vector, 6] = ovl
def assign_kernel(state, sat, ppi_offset, intf_len, tdata, time):
x, y = cuda.grid(2)
if y >= intf_len: return
line = sat[ppi_offset + y, 0]
if line < 0: return
sdim = state.shape[-1]
if x >= sdim: return
vector = x
a0 = tdata[y, 0, vector // 8]
a1 = tdata[y, 1, vector // 8]
a2 = tdata[y, 2, vector // 8]
m = np.uint8(1 << (7 - (vector % 8)))
toggle = 0
if a0 & m:
state[line + toggle, x] = TMIN
toggle += 1
if (a2 & m) and ((a0 & m) == (a1 & m)):
state[line + toggle, x] = time
toggle += 1
state[line + toggle, x] = TMAX
def rand_gauss(seed, sd):
clamp = 0.5
if sd <= 0.0:
return 1.0
while True:
x = -6.0
for i in range(12):
seed = int(0xDEECE66D) * seed + 0xB
x += float((seed >> 8) & 0xffffff) / float(1 << 24)
x *= sd
if abs(x) <= clamp:
return x + 1.0
def wave_kernel(ops, op_start, op_stop, state, sat, st_start, st_stop, line_times, sd, seed):
x, y = cuda.grid(2)
st_idx = st_start + x
op_idx = op_start + y
if st_idx >= st_stop: return
if op_idx >= op_stop: return
lut = ops[op_idx, 0]
z_idx = ops[op_idx, 1]
a_idx = ops[op_idx, 2]
b_idx = ops[op_idx, 3]
overflows = int(0)
_seed = (seed << 4) + (z_idx << 20) + (st_idx << 1)
a_mem = sat[a_idx, 0]
b_mem = sat[b_idx, 0]
z_mem, z_cap, _ = sat[z_idx]
a_cur = int(0)
b_cur = int(0)
z_cur = lut & 1
if z_cur == 1:
state[z_mem, st_idx] = TMIN
a = state[a_mem, st_idx] + line_times[a_idx, 0, z_cur] * rand_gauss(_seed ^ a_mem ^ z_cur, sd)
b = state[b_mem, st_idx] + line_times[b_idx, 0, z_cur] * rand_gauss(_seed ^ b_mem ^ z_cur, sd)
previous_t = TMIN
current_t = min(a, b)
inputs = int(0)
while current_t < TMAX:
z_val = z_cur & 1
if b < a:
b_cur += 1
b = state[b_mem + b_cur, st_idx]
b += line_times[b_idx, 0, z_val ^ 1] * rand_gauss(_seed ^ b_mem ^ z_val ^ 1, sd)
thresh = line_times[b_idx, 1, z_val] * rand_gauss(_seed ^ b_mem ^ z_val, sd)
inputs ^= 2
next_t = b
a_cur += 1
a = state[a_mem + a_cur, st_idx]
a += line_times[a_idx, 0, z_val ^ 1] * rand_gauss(_seed ^ a_mem ^ z_val ^ 1, sd)
thresh = line_times[a_idx, 1, z_val] * rand_gauss(_seed ^ a_mem ^ z_val, sd)
inputs ^= 1
next_t = a
if (z_cur & 1) != ((lut >> inputs) & 1):
# we generate a toggle in z_mem, if:
# ( it is the first toggle in z_mem OR
# following toggle is earlier OR
# pulse is wide enough ) AND enough space in z_mem.
if z_cur == 0 or next_t < current_t or (current_t - previous_t) > thresh:
if z_cur < (z_cap - 1):
state[z_mem + z_cur, st_idx] = current_t
previous_t = current_t
z_cur += 1
overflows += 1
previous_t = state[z_mem + z_cur - 1, st_idx]
z_cur -= 1
z_cur -= 1
if z_cur > 0:
previous_t = state[z_mem + z_cur - 1, st_idx]
previous_t = TMIN
current_t = min(a, b)
if overflows > 0:
state[z_mem + z_cur, st_idx] = TMAX_OVL
state[z_mem + z_cur, st_idx] = a if a > b else b # propagate overflow flags by storing biggest TMAX from input


@ -5,7 +5,7 @@ def test_b01(mydir): @@ -5,7 +5,7 @@ def test_b01(mydir):
with open(mydir / 'b01.bench', 'r') as f:
c = bench.parse(
assert 92 == len(c.nodes)
c = bench.parse(mydir / 'b01.bench')
c = bench.load(mydir / 'b01.bench')
assert 92 == len(c.nodes)


@ -1,6 +1,51 @@ @@ -1,6 +1,51 @@
from kyupy.circuit import Circuit, Node, Line
def test_lines():
c = Circuit()
n1 = Node(c, 'n1')
n2 = Node(c, 'n2')
line = Line(c, n1, n2)
assert line.driver == n1
assert line.reader == n2
assert line.driver_pin == 0
assert line.reader_pin == 0
assert n1.outs[0] == line
assert n2.ins[0] == line
line2 = Line(c, n1, (n2, 2))
assert line2.driver == n1
assert line2.reader == n2
assert line2.driver_pin == 1
assert line2.reader_pin == 2
assert n1.outs[0] == line
assert n1.outs[1] == line2
assert n2.ins[1] is None
assert n2.ins[2] == line2
line3 = Line(c, n1, n2)
assert line3.driver_pin == 2
assert line3.reader_pin == 1
assert n1.outs[2] == line3
assert n2.ins[1] == line3
assert n2.ins[2] == line2
assert len(c.lines) == 3
assert len(c.lines) == 2
assert c.lines[0].index == 0
assert c.lines[1].index == 1
assert n1.outs[2] is None
assert n2.ins[1] is None
assert n2.ins[2] == line2
def test_circuit():
c = Circuit()
in1 = Node(c, 'in1', 'buf')


@ -0,0 +1,214 @@ @@ -0,0 +1,214 @@
import kyupy.logic as lg
def test_mvarray():
# instantiation with shape
ary = lg.MVArray(4)
assert ary.length == 1
assert len(ary) == 1
assert ary.width == 4
ary = lg.MVArray((3, 2))
assert ary.length == 2
assert len(ary) == 2
assert ary.width == 3
# instantiation with single vector
ary = lg.MVArray([1, 0, 1])
assert ary.length == 1
assert ary.width == 3
assert str(ary) == "['101']"
assert ary[0] == '101'
ary = lg.MVArray("10X-")
assert ary.length == 1
assert ary.width == 4
assert str(ary) == "['10X-']"
assert ary[0] == '10X-'
ary = lg.MVArray("1")
assert ary.length == 1
assert ary.width == 1
ary = lg.MVArray(["1"])
assert ary.length == 1
assert ary.width == 1
# instantiation with multiple vectors
ary = lg.MVArray([[0, 0], [0, 1], [1, 0], [1, 1]])
assert ary.length == 4
assert ary.width == 2
ary = lg.MVArray(["000", "001", "110", "---"])
assert ary.length == 4
assert ary.width == 3
assert str(ary) == "['000', '001', '110', '---']"
assert ary[2] == '110'
# casting to 2-valued logic
ary = lg.MVArray([0, 1, 2, None], m=2)
assert[0] == lg.ZERO
assert[1] == lg.ONE
assert[2] == lg.ZERO
assert[3] == lg.ZERO
ary = lg.MVArray("0-X1PRFN", m=2)
assert[0] == lg.ZERO
assert[1] == lg.ZERO
assert[2] == lg.ZERO
assert[3] == lg.ONE
assert[4] == lg.ZERO
assert[5] == lg.ONE
assert[6] == lg.ZERO
assert[7] == lg.ONE
# casting to 4-valued logic
ary = lg.MVArray([0, 1, 2, None, 'F'], m=4)
assert[0] == lg.ZERO
assert[1] == lg.ONE
assert[2] == lg.UNKNOWN
assert[3] == lg.UNASSIGNED
assert[4] == lg.ZERO
ary = lg.MVArray("0-X1PRFN", m=4)
assert[0] == lg.ZERO
assert[1] == lg.UNASSIGNED
assert[2] == lg.UNKNOWN
assert[3] == lg.ONE
assert[4] == lg.ZERO
assert[5] == lg.ONE
assert[6] == lg.ZERO
assert[7] == lg.ONE
# casting to 8-valued logic
ary = lg.MVArray([0, 1, 2, None, 'F'], m=8)
assert[0] == lg.ZERO
assert[1] == lg.ONE
assert[2] == lg.UNKNOWN
assert[3] == lg.UNASSIGNED
assert[4] == lg.FALL
ary = lg.MVArray("0-X1PRFN", m=8)
assert[0] == lg.ZERO
assert[1] == lg.UNASSIGNED
assert[2] == lg.UNKNOWN
assert[3] == lg.ONE
assert[4] == lg.PPULSE
assert[5] == lg.RISE
assert[6] == lg.FALL
assert[7] == lg.NPULSE
# copy constructor and casting
ary8 = lg.MVArray(ary, m=8)
assert ary8.length == 1
assert ary8.width == 8
assert[7] == lg.NPULSE
ary4 = lg.MVArray(ary, m=4)
assert[1] == lg.UNASSIGNED
assert[7] == lg.ONE
ary2 = lg.MVArray(ary, m=2)
assert[1] == lg.ZERO
assert[7] == lg.ONE
def test_mv_operations():
x1_2v = lg.MVArray("0011", m=2)
x2_2v = lg.MVArray("0101", m=2)
x1_4v = lg.MVArray("0000XXXX----1111", m=4)
x2_4v = lg.MVArray("0X-10X-10X-10X-1", m=4)
x1_8v = lg.MVArray("00000000XXXXXXXX--------11111111PPPPPPPPRRRRRRRRFFFFFFFFNNNNNNNN", m=8)
x2_8v = lg.MVArray("0X-1PRFN0X-1PRFN0X-1PRFN0X-1PRFN0X-1PRFN0X-1PRFN0X-1PRFN0X-1PRFN", m=8)
assert lg.mv_not(x1_2v)[0] == '1100'
assert lg.mv_not(x1_4v)[0] == '1111XXXXXXXX0000'
assert lg.mv_not(x1_8v)[0] == '11111111XXXXXXXXXXXXXXXX00000000NNNNNNNNFFFFFFFFRRRRRRRRPPPPPPPP'
assert lg.mv_or(x1_2v, x2_2v)[0] == '0111'
assert lg.mv_or(x1_4v, x2_4v)[0] == '0XX1XXX1XXX11111'
assert lg.mv_or(x1_8v, x2_8v)[0] == '0XX1PRFNXXX1XXXXXXX1XXXX11111111PXX1PRFNRXX1RRNNFXX1FNFNNXX1NNNN'
assert lg.mv_and(x1_2v, x2_2v)[0] == '0001'
assert lg.mv_and(x1_4v, x2_4v)[0] == '00000XXX0XXX0XX1'
assert lg.mv_and(x1_8v, x2_8v)[0] == '000000000XXXXXXX0XXXXXXX0XX1PRFN0XXPPPPP0XXRPRPR0XXFPPFF0XXNPRFN'
assert lg.mv_xor(x1_2v, x2_2v)[0] == '0110'
assert lg.mv_xor(x1_4v, x2_4v)[0] == '0XX1XXXXXXXX1XX0'
def test_bparray():
ary = lg.BPArray(4)
assert ary.length == 1
assert len(ary) == 1
assert ary.width == 4
ary = lg.BPArray((3, 2))
assert ary.length == 2
assert len(ary) == 2
assert ary.width == 3
assert lg.MVArray(lg.BPArray("01", m=2))[0] == '01'
assert lg.MVArray(lg.BPArray("0X-1", m=4))[0] == '0X-1'
assert lg.MVArray(lg.BPArray("0X-1PRFN", m=8))[0] == '0X-1PRFN'
x1_2v = lg.BPArray("0011", m=2)
x2_2v = lg.BPArray("0101", m=2)
x1_4v = lg.BPArray("0000XXXX----1111", m=4)
x2_4v = lg.BPArray("0X-10X-10X-10X-1", m=4)
x1_8v = lg.BPArray("00000000XXXXXXXX--------11111111PPPPPPPPRRRRRRRRFFFFFFFFNNNNNNNN", m=8)
x2_8v = lg.BPArray("0X-1PRFN0X-1PRFN0X-1PRFN0X-1PRFN0X-1PRFN0X-1PRFN0X-1PRFN0X-1PRFN", m=8)
out_2v = lg.BPArray((4, 1), m=2)
out_4v = lg.BPArray((16, 1), m=4)
out_8v = lg.BPArray((64, 1), m=8)
assert lg.MVArray(out_2v)[0] == '0011'
assert lg.MVArray(out_4v)[0] == '0000XXXXXXXX1111'
assert lg.MVArray(out_2v)[0] == '1100'
assert lg.MVArray(out_4v)[0] == '1111XXXXXXXX0000'
assert lg.MVArray(out_2v)[0] == '0111'
assert lg.MVArray(out_4v)[0] == '0XX1XXX1XXX11111'
assert lg.MVArray(out_2v)[0] == '0001'
assert lg.MVArray(out_4v)[0] == '00000XXX0XXX0XX1'
assert lg.MVArray(out_2v)[0] == '0110'
assert lg.MVArray(out_4v)[0] == '0XX1XXXXXXXX1XX0'


@ -1,161 +1,96 @@ @@ -1,161 +1,96 @@
from kyupy.logic_sim import LogicSim
from kyupy import bench
from kyupy.packed_vectors import PackedVectors
from kyupy.logic import MVArray, BPArray
def test_vd1():
def test_2v():
c = bench.parse('input(x, y) output(a, o, n) a=and(x,y) o=or(x,y) n=not(x)')
s = LogicSim(c, 4)
s = LogicSim(c, 4, m=2)
assert len(s.interface) == 5
p = PackedVectors(4, len(s.interface))
p[0] = '00000'
p[1] = '01000'
p[2] = '10000'
p[3] = '11000'
mva = MVArray(['00000', '01000', '10000', '11000'], m=2)
bpa = BPArray(mva)
assert p[0] == '00001'
assert p[1] == '01011'
assert p[2] == '10010'
assert p[3] == '11110'
mva = MVArray(bpa)
assert mva[0] == '00001'
assert mva[1] == '01011'
assert mva[2] == '10010'
assert mva[3] == '11110'
def test_vd2():
def test_4v():
c = bench.parse('input(x, y) output(a, o, n) a=and(x,y) o=or(x,y) n=not(x)')
s = LogicSim(c, 16, 2)
s = LogicSim(c, 16, m=4)
assert len(s.interface) == 5
p = PackedVectors(16, len(s.interface), 2)
p[0] = '00000'
p[1] = '01000'
p[2] = '0-000'
p[3] = '0X000'
p[4] = '10000'
p[5] = '11000'
p[6] = '1-000'
p[7] = '1X000'
p[8] = '-0000'
p[9] = '-1000'
p[10] = '--000'
p[11] = '-X000'
p[12] = 'X0000'
p[13] = 'X1000'
p[14] = 'X-000'
p[15] = 'XX000'
mva = MVArray(['00000', '01000', '0-000', '0X000',
'10000', '11000', '1-000', '1X000',
'-0000', '-1000', '--000', '-X000',
'X0000', 'X1000', 'X-000', 'XX000'], m=4)
bpa = BPArray(mva)
assert p[0] == '00001'
assert p[1] == '01011'
assert p[2] == '0-0X1'
assert p[3] == '0X0X1'
assert p[4] == '10010'
assert p[5] == '11110'
assert p[6] == '1-X10'
assert p[7] == '1XX10'
assert p[8] == '-00XX'
assert p[9] == '-1X1X'
assert p[10] == '--XXX'
assert p[11] == '-XXXX'
assert p[12] == 'X00XX'
assert p[13] == 'X1X1X'
assert p[14] == 'X-XXX'
assert p[15] == 'XXXXX'
mva = MVArray(bpa)
assert mva[0] == '00001'
assert mva[1] == '01011'
assert mva[2] == '0-0X1'
assert mva[3] == '0X0X1'
assert mva[4] == '10010'
assert mva[5] == '11110'
assert mva[6] == '1-X10'
assert mva[7] == '1XX10'
assert mva[8] == '-00XX'
assert mva[9] == '-1X1X'
assert mva[10] == '--XXX'
assert mva[11] == '-XXXX'
assert mva[12] == 'X00XX'
assert mva[13] == 'X1X1X'
assert mva[14] == 'X-XXX'
assert mva[15] == 'XXXXX'
def test_vd3():
def test_8v():
c = bench.parse('input(x, y) output(a, o, n, xo) a=and(x,y) o=or(x,y) n=not(x) xo=xor(x,y)')
s = LogicSim(c, 64, 3)
s = LogicSim(c, 64, m=8)
assert len(s.interface) == 6
p = PackedVectors(64, len(s.interface), 3)
p[0] = '000010'
p[1] = '010111'
p[2] = '0-0X1X'
p[3] = '0X0X1X'
p[4] = '0R0R1R'
p[5] = '0F0F1F'
p[6] = '0P0P1P'
p[7] = '0N0N1N'
p[8] = '100101'
p[9] = '111100'
p[10] = '1-X10X'
p[11] = '1XX10X'
p[12] = '1RR10F'
p[13] = '1FF10R'
p[14] = '1PP10N'
p[15] = '1NN10P'
p[16] = '-00XXX'
p[17] = '-1X1XX'
p[18] = '--XXXX'
p[19] = '-XXXXX'
p[20] = '-RXXXX'
p[21] = '-FXXXX'
p[22] = '-PXXXX'
p[23] = '-NXXXX'
p[24] = 'X00XXX'
p[25] = 'X1X1XX'
p[26] = 'X-XXXX'
p[27] = 'XXXXXX'
p[28] = 'XRXXXX'
p[29] = 'XFXXXX'
p[30] = 'XPXXXX'
p[31] = 'XNXXXX'
p[32] = 'R00RFR'
p[33] = 'R1R1FF'
p[34] = 'R-XXFX'
p[35] = 'RXXXFX'
p[36] = 'RRRRFP'
p[37] = 'RFPNFN'
p[38] = 'RPPRFR'
p[39] = 'RNRNFF'
p[40] = 'F00FRF'
p[41] = 'F1F1RR'
p[42] = 'F-XXRX'
p[43] = 'FXXXRX'
p[44] = 'FRPNRN'
p[45] = 'FFFFRP'
p[46] = 'FPPFRF'
p[47] = 'FNFNRR'
p[48] = 'P00PNP'
p[49] = 'P1P1NN'
p[50] = 'P-XXNX'
p[51] = 'PXXXNX'
p[52] = 'PRPRNR'
p[53] = 'PFPFNF'
p[54] = 'PPPPNP'
p[55] = 'PNPNNN'
p[56] = 'N00NPN'
p[57] = 'N1N1PP'
p[58] = 'N-XXPX'
p[59] = 'NXXXPX'
p[60] = 'NRRNPF'
p[61] = 'NFFNPR'
p[62] = 'NPPNPN'
p[63] = 'NNNNPP'
expect = p.copy()
mva = MVArray(['000010', '010111', '0-0X1X', '0X0X1X', '0R0R1R', '0F0F1F', '0P0P1P', '0N0N1N',
'100101', '111100', '1-X10X', '1XX10X', '1RR10F', '1FF10R', '1PP10N', '1NN10P',
'-00XXX', '-1X1XX', '--XXXX', '-XXXXX', '-RXXXX', '-FXXXX', '-PXXXX', '-NXXXX',
bpa = BPArray(mva)
resp_bp = BPArray(bpa)
resp = MVArray(resp_bp)
for i in range(64):
assert p[i] == expect[i]
assert resp[i] == mva[i]
def test_b01(mydir):
c = bench.parse(mydir / 'b01.bench')
c = bench.load(mydir / 'b01.bench')
# 2-valued
s = LogicSim(c, 8)
s = LogicSim(c, 8, m=2)
assert len(s.interface) == 9
t = PackedVectors(8, len(s.interface))
mva = MVArray((len(s.interface), 8), m=2)
# mva.randomize()
bpa = BPArray(mva)
# 8-valued
s = LogicSim(c, 8, 3)
t = PackedVectors(8, len(s.interface), 3)
s = LogicSim(c, 8, m=8)
mva = MVArray((len(s.interface), 8), m=8)
# mva.randomize()
bpa = BPArray(mva)


@ -1,88 +0,0 @@ @@ -1,88 +0,0 @@
from kyupy.packed_vectors import PackedVectors
def test_basic():
ba = PackedVectors(8, 1, 1)
assert '0\n0\n0\n0\n0\n0\n0\n0' == str(ba)
ba.set_value(0, 0, 1)
ba.set_value(1, 0, 'H')
ba.set_value(2, 0, 'h')
ba.set_value(3, 0, True)
ba.set_value(4, 0, 0)
ba.set_value(5, 0, 'L')
ba.set_value(6, 0, 'l')
ba.set_value(7, 0, False)
assert '1\n1\n1\n1\n0\n0\n0\n0' == str(ba)
ba.set_value(1, 0, '0')
ba.set_value(5, 0, '1')
assert '1\n0\n1\n1\n0\n1\n0\n0' == str(ba)
ba = PackedVectors(8, 1, 2)
assert '-\n-\n-\n-\n-\n-\n-\n-' == str(ba)
ba.set_value(0, 0, 1)
ba.set_value(7, 0, 0)
ba.set_value(4, 0, 'X')
assert '1\n-\n-\n-\nX\n-\n-\n0' == str(ba)
ba.set_value(4, 0, '-')
assert '1\n-\n-\n-\n-\n-\n-\n0' == str(ba)
ba = PackedVectors(8, 2, 2)
assert '--\n--\n--\n--\n--\n--\n--\n--' == str(ba)
ba.set_value(0, 0, '1')
ba.set_value(7, 1, '0')
ba.set_values(1, 'XX')
assert '1-\nXX\n--\n--\n--\n--\n--\n-0' == str(ba)
def test_8v():
ba = PackedVectors(1, 8, 3)
assert '--------' == str(ba)
ba.set_values(0, r'-x01^v\/')
assert r'-X01PNFR' == str(ba)
ba.set_values(0, '-XLHPNFR')
assert r'-X01PNFR' == str(ba)
ba.set_values(0, '-xlhpnfr')
assert r'-X01PNFR' == str(ba)
p1 = PackedVectors(1, 8, 1)
p2 = PackedVectors(1, 8, 1)
p1.set_values(0, '01010101')
p2.set_values(0, '00110011')
p = PackedVectors.from_pair(p1, p2)
assert r'0FR10FR1' == str(p)
p1 = PackedVectors(1, 8, 2)
p2 = PackedVectors(1, 8, 2)
p1.set_values(0, '0101-X-X')
p2.set_values(0, '00110011')
p = PackedVectors.from_pair(p1, p2)
assert r'0FR1----' == str(p)
p1.set_values(0, '0101-X-X')
p2.set_values(0, '-X-X--XX')
p = PackedVectors.from_pair(p1, p2)
assert r'--------' == str(p)
def test_slicing():
lv = PackedVectors(3, 2, 1)
assert '00\n00\n00' == str(lv)
lv.set_value(1, 0, '1')
lv.set_value(1, 1, '1')
assert '00' == lv[0]
assert '11' == lv[1]
assert 3 == len(lv)
lv2 = lv[1:3]
assert 2 == len(lv2)
assert '11' == lv2[0]
assert '00' == lv2[1]
def test_copy():
lv1 = PackedVectors(8, 1, 1)
lv1.set_values_for_position(0, '01010101')
lv2 = PackedVectors(8, 1, 1)
lv2.set_values_for_position(0, '00100101')
diff = lv1.diff(lv2)
lv3 = lv1.copy(selection_mask=diff)
assert str(lv3) == '1\n0\n1'
lv4 = lv1.copy(selection_mask=~diff)
assert str(lv4) == '0\n0\n1\n0\n1'
lv5 = lv3 + lv4
assert str(lv5) == '1\n0\n1\n0\n0\n1\n0\n1'


@ -74,13 +74,13 @@ def test_parse(): @@ -74,13 +74,13 @@ def test_parse():
def test_b14(mydir):
df = sdf.parse(mydir / 'b14.sdf.gz')
df = sdf.load(mydir / 'b14.sdf.gz')
assert == 'b14'
def test_gates(mydir):
c = verilog.parse(mydir / 'gates.v')
df = sdf.parse(mydir / 'gates.sdf')
c = verilog.load(mydir / 'gates.v')
df = sdf.load(mydir / 'gates.sdf')
lt = df.annotation(c, pin_index, dataset=1)
nand_a = c.cells['nandgate'].ins[0]
nand_b = c.cells['nandgate'].ins[1]


@ -2,7 +2,7 @@ from kyupy import stil @@ -2,7 +2,7 @@ from kyupy import stil
def test_b14(mydir):
s = stil.parse(mydir / 'b14.stuck.stil.gz')
s = stil.load(mydir / 'b14.stuck.stil.gz')
assert 10 == len(s.signal_groups)
assert 1 == len(s.scan_chains)
assert 2163 == len(s.calls)


@ -5,5 +5,4 @@ def test_b01(mydir): @@ -5,5 +5,4 @@ def test_b01(mydir):
with open(mydir / 'b01.v', 'r') as f:
modules = verilog.parse(
assert modules is not None
assert verilog.parse(mydir / 'b01.v') is not None
assert verilog.load(mydir / 'b01.v') is not None


@ -1,11 +1,10 @@ @@ -1,11 +1,10 @@
import numpy as np
from kyupy.wave_sim import WaveSim, wave_eval, TMIN, TMAX
from kyupy.wave_sim import WaveSim, WaveSimCuda, wave_eval, TMIN, TMAX
from kyupy.logic_sim import LogicSim
from kyupy import verilog
from kyupy import sdf
from kyupy import verilog, sdf, logic
from kyupy.saed import pin_index
from kyupy.packed_vectors import PackedVectors
from kyupy.wave_sim_cuda import WaveSimCuda
from kyupy.logic import MVArray, BPArray
def test_wave_eval():
@ -96,24 +95,29 @@ def test_wave_eval(): @@ -96,24 +95,29 @@ def test_wave_eval():
def compare_to_logic_sim(wsim):
tests = PackedVectors(wsim.sims, len(wsim.interface), 3)
tests = MVArray((len(wsim.interface), wsim.sims))
choices = np.asarray([logic.ZERO, logic.ONE, logic.RISE, logic.FALL], dtype=np.uint8)
rng = np.random.default_rng(10)[...] = rng.choice(choices,
tests_bp = BPArray(tests)
cdata = wsim.capture()
resp = tests.copy()
resp = MVArray(tests)
for iidx, inode in enumerate(wsim.interface):
if len(inode.ins) > 0:
for vidx in range(wsim.sims):
resp.set_value(vidx, iidx, 0 if cdata[iidx, vidx, 0] < 0.5 else 1)[iidx, vidx] = logic.ZERO if cdata[iidx, vidx, 0] < 0.5 else logic.ONE
# resp.set_value(vidx, iidx, 0 if cdata[iidx, vidx, 0] < 0.5 else 1)
lsim = LogicSim(wsim.circuit, len(tests), 3)
lsim = LogicSim(wsim.circuit, len(tests_bp))
exp = tests.copy()
exp_bp = BPArray(tests_bp)
exp = MVArray(exp_bp)
for i in range(8):
exp_str = exp[i].replace('R', '1').replace('F', '0').replace('P', '0').replace('N', '1')
@ -122,24 +126,24 @@ def compare_to_logic_sim(wsim): @@ -122,24 +126,24 @@ def compare_to_logic_sim(wsim):
def test_b14(mydir):
c = verilog.parse(mydir / 'b14.v.gz', branchforks=True)
df = sdf.parse(mydir / 'b14.sdf.gz')
c = verilog.load(mydir / 'b14.v.gz', branchforks=True)
df = sdf.load(mydir / 'b14.sdf.gz')
lt = df.annotation(c, pin_index)
wsim = WaveSim(c, lt, 8)
def test_b14_strip_forks(mydir):
c = verilog.parse(mydir / 'b14.v.gz', branchforks=True)
df = sdf.parse(mydir / 'b14.sdf.gz')
c = verilog.load(mydir / 'b14.v.gz', branchforks=True)
df = sdf.load(mydir / 'b14.sdf.gz')
lt = df.annotation(c, pin_index)
wsim = WaveSim(c, lt, 8, strip_forks=True)
def test_b14_cuda(mydir):
c = verilog.parse(mydir / 'b14.v.gz', branchforks=True)
df = sdf.parse(mydir / 'b14.sdf.gz')
c = verilog.load(mydir / 'b14.v.gz', branchforks=True)
df = sdf.load(mydir / 'b14.sdf.gz')
lt = df.annotation(c, pin_index)
wsim = WaveSimCuda(c, lt, 8)
