Browse Source

switch to new wave_sim, silence occupancy warnings

devel
Stefan Holst 2 years ago
parent
commit
8da4a62bce
  1. 2
      src/kyupy/__init__.py
  2. 1034
      src/kyupy/wave_sim.py
  3. 509
      src/kyupy/wave_sim4.py
  4. 961
      src/kyupy/wave_sim_old.py
  5. 248
      tests/test_wave_sim.py
  6. 166
      tests/test_wave_sim4.py
  7. 138
      tests/test_wave_sim_old.py

2
src/kyupy/__init__.py

@ -211,6 +211,8 @@ if importlib.util.find_spec('numba') is not None:
except CudaSupportError: except CudaSupportError:
log.warn('Cuda unavailable. Falling back to pure Python.') log.warn('Cuda unavailable. Falling back to pure Python.')
cuda = MockCuda() cuda = MockCuda()
from numba.core import config
config.CUDA_LOW_OCCUPANCY_WARNINGS = False
else: else:
numba = MockNumba() numba = MockNumba()
"""If Numba is available on the system, it is the actual ``numba`` package. """If Numba is available on the system, it is the actual ``numba`` package.

1034
src/kyupy/wave_sim.py

File diff suppressed because it is too large Load Diff

509
src/kyupy/wave_sim4.py

@ -1,509 +0,0 @@
"""High-throughput combinational logic timing simulators.
These simulators work similarly to :py:class:`~kyupy.logic_sim.LogicSim`.
They propagate values through the combinational circuit from (pseudo) primary inputs to (pseudo) primary outputs.
Instead of propagating logic values, these simulators propagate signal histories (waveforms).
They are designed to run many simulations in parallel and while their latencies are quite high, they can achieve
high throughput.
The simulators are not event-based and are not capable of simulating sequential circuits directly.
"""
import math
import numpy as np
from . import numba, cuda, hr_bytes
from .sim import SimOps
TMAX = np.float32(2 ** 127)
"""A large 32-bit floating point value used to mark the end of a waveform."""
TMAX_OVL = np.float32(1.1 * 2 ** 127)
"""A large 32-bit floating point value used to mark the end of a waveform that
may be incomplete due to an overflow."""
TMIN = np.float32(-2 ** 127)
"""A large negative 32-bit floating point value used at the beginning of waveforms that start with logic-1."""
class WaveSim(SimOps):
"""A waveform-based combinational logic timing simulator running on CPU.
:param circuit: The circuit to simulate.
:param timing: The timing annotation of the circuit (see :py:func:`kyupy.sdf.DelayFile.annotation` for details)
:param sims: The number of parallel simulations.
:param c_caps: The number of floats available in each waveform. Values must be positive and a multiple of 4.
Waveforms encode the signal switching history by storing transition times.
The waveform capacity roughly corresponds to the number of transitions
that can be stored. A capacity of ``n`` can store at least ``n-2`` transitions. If more transitions are
generated during simulation, the latest glitch is removed (freeing up two transition times) and an overflow
flag is set. If an integer is given, all waveforms are set to that same capacity. With an array of length
``len(circuit.lines)`` the capacity is set for each intermediate waveform individually.
:param strip_forks: If enabled, the simulator will not evaluate fork nodes explicitly. This saves simulation time
by reducing the number of nodes to simulate, but (interconnect) delay annotations of lines read by fork nodes
are ignored.
:param keep_waveforms: If disabled, memory of intermediate signal waveforms will be re-used. This greatly reduces
memory footprint, but intermediate signal waveforms become unaccessible after a propagation.
"""
def __init__(self, circuit, timing, sims=8, c_caps=16, c_reuse=False, strip_forks=False):
assert c_caps > 0 and c_caps % 4 == 0
super().__init__(circuit, c_caps=c_caps//4, c_reuse=c_reuse, strip_forks=strip_forks)
self.sims = sims
self.c_len *= 4
self.vat[...,0:2] *= 4
self.timing = np.zeros((self.c_len, 2, 2))
self.timing[:len(timing)] = timing
self.c = np.zeros((self.c_len, sims), dtype=np.float32) + TMAX
self.s = np.zeros((len(self.s_nodes), sims, 11), dtype=np.float32)
"""Information about the logic values and transitions around the sequential elements (flip-flops) and ports.
The first 3 values are read by ``s_to_c()``.
The remaining values are written by ``c_to_s()``.
The elements are as follows:
* ``s[..., 0]`` (P)PI initial value
* ``s[..., 1]`` (P)PI transition time
* ``s[..., 2]`` (P)PI final value
* ``s[..., 3]`` (P)PO initial value
* ``s[..., 4]`` (P)PO earliest arrival time (EAT): The time at which the output transitioned from its initial value.
* ``s[..., 5]`` (P)PO latest stabilization time (LST): The time at which the output settled to its final value.
* ``s[..., 6]`` (P)PO final value
* ``s[..., 7]`` (P)PO capture value: probability of capturing a 1 at a given capture time
* ``s[..., 8]`` (P)PO sampled capture value: decided by random sampling according to a given seed.
* ``s[..., 9]`` (P)PO sampled capture slack: (capture time - LST) - decided by random sampling according to a given seed.
* ``s[..., 10]`` Overflow indicator: If non-zero, some signals in the input cone of this output had more
transitions than specified in ``c_caps``. Some transitions have been discarded, the
final values in the waveforms are still valid.
"""
self.params = np.zeros((sims, 4), dtype=np.float32)
self.params[...,0] = 1.0
self.nbytes = sum([a.nbytes for a in (self.c, self.s, self.vat, self.ops, self.params)])
self.pi_s_locs = np.flatnonzero(self.vat[self.ppi_offset+np.arange(len(self.circuit.io_nodes)), 0] >= 0)
self.po_s_locs = np.flatnonzero(self.vat[self.ppo_offset+np.arange(len(self.circuit.io_nodes)), 0] >= 0)
self.ppio_s_locs = np.arange(len(self.circuit.io_nodes), len(self.s_nodes))
self.pippi_s_locs = np.concatenate([self.pi_s_locs, self.ppio_s_locs])
self.poppo_s_locs = np.concatenate([self.po_s_locs, self.ppio_s_locs])
self.pi_c_locs = self.vat[self.ppi_offset+self.pi_s_locs, 0]
self.po_c_locs = self.vat[self.ppo_offset+self.po_s_locs, 0]
self.ppi_c_locs = self.vat[self.ppi_offset+self.ppio_s_locs, 0]
self.ppo_c_locs = self.vat[self.ppo_offset+self.ppio_s_locs, 0]
self.pippi_c_locs = np.concatenate([self.pi_c_locs, self.ppi_c_locs])
self.poppo_c_locs = np.concatenate([self.po_c_locs, self.ppo_c_locs])
def __repr__(self):
return f'<{type(self).__name__} {self.circuit.name} sims={self.sims} ops={len(self.ops)} ' + \
f'levels={len(self.level_starts)} mem={hr_bytes(self.nbytes)}>'
def s_to_c(self):
"""Transfers values of sequential elements and primary inputs to the combinational portion.
Based on the data in ``self.s``, waveforms are generated on the input lines of the circuit.
It modifies ``self.c``.
"""
sins = np.moveaxis(self.s[self.pippi_s_locs], -1, 0)
cond = (sins[2] != 0) + 2*(sins[0] != 0) # choices order: 0 R F 1
self.c[self.pippi_c_locs] = np.choose(cond, [TMAX, sins[1], TMIN, TMIN])
self.c[self.pippi_c_locs+1] = np.choose(cond, [TMAX, TMAX, sins[1], TMAX])
self.c[self.pippi_c_locs+2] = TMAX
def c_prop(self, sims=None, sd=0.0, seed=1):
"""Propagates all waveforms from the (pseudo) primary inputs to the (pseudo) primary outputs.
:param sims: Number of parallel simulations to execute. If None, all available simulations are performed.
:param sd: Standard deviation for injection of random delay variation. Active, if value is positive.
:param seed: Random seed for delay variations.
"""
sims = min(sims or self.sims, self.sims)
for op_start, op_stop in zip(self.level_starts, self.level_stops):
level_eval_cpu(self.ops, op_start, op_stop, self.c, self.vat, 0, sims,
self.timing, self.params, sd, seed)
def c_to_s(self, time=TMAX, sd=0.0, seed=1):
"""Simulates a capture operation at all sequential elements and primary outputs.
Propagated waveforms in ``self.c`` at and around the given capture time are analyzed and
the results are stored in ``self.s``.
:param time: The desired capture time. By default, a capture of the settled value is performed.
:param sd: A standard deviation for uncertainty in the actual capture time.
:param seed: The random seed for a capture with uncertainty.
"""
for s_loc, (c_loc, c_len, _) in zip(self.poppo_s_locs, self.vat[self.ppo_offset+self.poppo_s_locs]):
for vector in range(self.sims):
self.s[s_loc, vector, 3:] = wave_capture_cpu(self.c, c_loc, c_len, vector, time=time, sd=sd, seed=seed)
def s_ppo_to_ppi(self, time=0.0):
"""Re-assigns the last sampled capture to the appropriate pseudo-primary inputs (PPI).
Each PPI transition is constructed from its previous final value, the
given time, and the sampled captured value of its PPO. Reads and modifies ``self.s``.
:param time: The transition time at the inputs (usually 0.0).
"""
self.s[self.ppio_s_locs, :, 0] = self.s[self.ppio_s_locs, :, 2]
self.s[self.ppio_s_locs, :, 1] = time
self.s[self.ppio_s_locs, :, 2] = self.s[self.ppio_s_locs, :, 8]
@numba.njit
def rand_gauss_cpu(seed, sd):
clamp = 0.5
if sd <= 0.0:
return 1.0
while True:
x = -6.0
for _ in range(12):
seed = int(0xDEECE66D) * seed + 0xB
x += float((seed >> 8) & 0xffffff) / float(1 << 24)
x *= sd
if abs(x) <= clamp:
break
return x + 1.0
@numba.njit
def wave_eval_cpu(op, cbuf, vat, st_idx, line_times, param, sd=0.0, seed=0):
lut, z_idx, a_idx, b_idx, c_idx, d_idx = op
# >>> same code as wave_eval_cpu (except rand_gauss_*pu()-call) >>>
overflows = int(0)
_seed = (seed << 4) + (z_idx << 20) + (st_idx << 1)
a_mem = vat[a_idx, 0]
b_mem = vat[b_idx, 0]
c_mem = vat[c_idx, 0]
d_mem = vat[d_idx, 0]
z_mem, z_cap, _ = vat[z_idx]
a_cur = int(0)
b_cur = int(0)
c_cur = int(0)
d_cur = int(0)
z_cur = lut & 1
if z_cur == 1:
cbuf[z_mem, st_idx] = TMIN
a = cbuf[a_mem, st_idx] + line_times[a_idx, 0, z_cur] * rand_gauss_cpu(_seed ^ a_mem ^ z_cur, sd) * param[0]
if int(param[1]) == a_idx: a += param[2+z_cur]
b = cbuf[b_mem, st_idx] + line_times[b_idx, 0, z_cur] * rand_gauss_cpu(_seed ^ b_mem ^ z_cur, sd) * param[0]
if int(param[1]) == b_idx: b += param[2+z_cur]
c = cbuf[c_mem, st_idx] + line_times[c_idx, 0, z_cur] * rand_gauss_cpu(_seed ^ c_mem ^ z_cur, sd) * param[0]
if int(param[1]) == c_idx: c += param[2+z_cur]
d = cbuf[d_mem, st_idx] + line_times[d_idx, 0, z_cur] * rand_gauss_cpu(_seed ^ d_mem ^ z_cur, sd) * param[0]
if int(param[1]) == d_idx: d += param[2+z_cur]
previous_t = TMIN
current_t = min(a, b, c, d)
inputs = int(0)
while current_t < TMAX:
z_val = z_cur & 1
if a == current_t:
a_cur += 1
a = cbuf[a_mem + a_cur, st_idx]
a += line_times[a_idx, 0, z_val ^ 1] * rand_gauss_cpu(_seed ^ a_mem ^ z_val ^ 1, sd) * param[0]
thresh = line_times[a_idx, 1, z_val] * rand_gauss_cpu(_seed ^ a_mem ^ z_val, sd) * param[0]
if int(param[1]) == a_idx:
a += param[2+(z_val^1)]
thresh += param[2+z_val]
inputs ^= 1
next_t = a
elif b == current_t:
b_cur += 1
b = cbuf[b_mem + b_cur, st_idx]
b += line_times[b_idx, 0, z_val ^ 1] * rand_gauss_cpu(_seed ^ b_mem ^ z_val ^ 1, sd) * param[0]
thresh = line_times[b_idx, 1, z_val] * rand_gauss_cpu(_seed ^ b_mem ^ z_val, sd) * param[0]
if int(param[1]) == b_idx:
b += param[2+(z_val^1)]
thresh += param[2+z_val]
inputs ^= 2
next_t = b
elif c == current_t:
c_cur += 1
c = cbuf[c_mem + c_cur, st_idx]
c += line_times[c_idx, 0, z_val ^ 1] * rand_gauss_cpu(_seed ^ c_mem ^ z_val ^ 1, sd) * param[0]
thresh = line_times[c_idx, 1, z_val] * rand_gauss_cpu(_seed ^ c_mem ^ z_val, sd) * param[0]
if int(param[1]) == c_idx:
c += param[2+(z_val^1)]
thresh += param[2+z_val]
inputs ^= 4
next_t = c
else:
d_cur += 1
d = cbuf[d_mem + d_cur, st_idx]
d += line_times[d_idx, 0, z_val ^ 1] * rand_gauss_cpu(_seed ^ d_mem ^ z_val ^ 1, sd) * param[0]
thresh = line_times[d_idx, 1, z_val] * rand_gauss_cpu(_seed ^ d_mem ^ z_val, sd) * param[0]
if int(param[1]) == d_idx:
d += param[2+(z_val^1)]
thresh += param[2+z_val]
inputs ^= 8
next_t = d
if (z_cur & 1) != ((lut >> inputs) & 1):
# we generate a toggle in z_mem, if:
# ( it is the first toggle in z_mem OR
# following toggle is earlier OR
# pulse is wide enough ) AND enough space in z_mem.
if z_cur == 0 or next_t < current_t or (current_t - previous_t) > thresh:
if z_cur < (z_cap - 1):
cbuf[z_mem + z_cur, st_idx] = current_t
previous_t = current_t
z_cur += 1
else:
overflows += 1
previous_t = cbuf[z_mem + z_cur - 1, st_idx]
z_cur -= 1
else:
z_cur -= 1
previous_t = cbuf[z_mem + z_cur - 1, st_idx] if z_cur > 0 else TMIN
current_t = min(a, b, c, d)
# generate overflow flag or propagate from input
cbuf[z_mem + z_cur, st_idx] = TMAX_OVL if overflows > 0 else max(a, b, c, d)
@numba.njit
def level_eval_cpu(ops, op_start, op_stop, c, vat, st_start, st_stop, line_times, params, sd, seed):
overflows = 0
for op_idx in range(op_start, op_stop):
op = ops[op_idx]
for st_idx in range(st_start, st_stop):
wave_eval_cpu(op, c, vat, st_idx, line_times, params[st_idx], sd, seed)
@numba.njit
def wave_capture_cpu(c, c_loc, c_len, vector, time=TMAX, sd=0.0, seed=1):
s_sqrt2 = sd * math.sqrt(2)
m = 0.5
acc = 0.0
eat = TMAX
lst = TMIN
tog = 0
ovl = 0
val = int(0)
final = int(0)
w = c[c_loc:c_loc+c_len, vector]
for t in w:
if t >= TMAX:
if t == TMAX_OVL:
ovl = 1
break
m = -m
final ^= 1
if t < time:
val ^= 1
if t <= TMIN: continue
if s_sqrt2 > 0:
acc += m * (1 + math.erf((t - time) / s_sqrt2))
eat = min(eat, t)
lst = max(lst, t)
tog += 1
if s_sqrt2 > 0:
if m < 0:
acc += 1
if acc >= 0.99:
val = 1
elif acc > 0.01:
seed = (seed << 4) + (vector << 20) + c_loc
seed = int(0xDEECE66D) * seed + 0xB
seed = int(0xDEECE66D) * seed + 0xB
rnd = float((seed >> 8) & 0xffffff) / float(1 << 24)
val = rnd < acc
else:
val = 0
else:
acc = val
return (w[0] <= TMIN), eat, lst, final, acc, val, 0, ovl
class WaveSimCuda(WaveSim):
"""A GPU-accelerated waveform-based combinational logic timing simulator.
The API is the same as for :py:class:`WaveSim`.
All internal memories are mirrored into GPU memory upon construction.
Some operations like access to single waveforms can involve large communication overheads.
"""
def __init__(self, circuit, timing, sims=8, c_caps=16, c_reuse=False, strip_forks=False):
super().__init__(circuit, timing, sims, c_caps, c_reuse, strip_forks)
self.c = cuda.to_device(self.c)
self.s = cuda.to_device(self.s)
self.ops = cuda.to_device(self.ops)
self.vat = cuda.to_device(self.vat)
self.timing = cuda.to_device(self.timing)
self.params = cuda.to_device(self.params)
self._block_dim = (32, 16)
# TODO implement on GPU
#def s_to_c(self):
def _grid_dim(self, x, y):
gx = math.ceil(x / self._block_dim[0])
gy = math.ceil(y / self._block_dim[1])
return gx, gy
def c_prop(self, sims=None, sd=0.0, seed=1):
sims = min(sims or self.sims, self.sims)
for op_start, op_stop in zip(self.level_starts, self.level_stops):
grid_dim = self._grid_dim(sims, op_stop - op_start)
wave_eval_gpu[grid_dim, self._block_dim](self.ops, op_start, op_stop, self.c, self.vat, int(0),
sims, self.timing, self.params, sd, seed)
cuda.synchronize()
# TODO implement on GPU
#def c_to_s(self):
# TODO implement on GPU
#def s_ppo_to_ppi(self, time=0.0):
@cuda.jit(device=True)
def rand_gauss_gpu(seed, sd):
clamp = 0.5
if sd <= 0.0:
return 1.0
while True:
x = -6.0
for _ in range(12):
seed = int(0xDEECE66D) * seed + 0xB
x += float((seed >> 8) & 0xffffff) / float(1 << 24)
x *= sd
if abs(x) <= clamp:
break
return x + 1.0
@cuda.jit()
def wave_eval_gpu(ops, op_start, op_stop, cbuf, vat, st_start, st_stop, line_times, param, sd, seed):
x, y = cuda.grid(2)
st_idx = st_start + x
op_idx = op_start + y
if st_idx >= st_stop: return
if op_idx >= op_stop: return
lut = ops[op_idx, 0]
z_idx = ops[op_idx, 1]
a_idx = ops[op_idx, 2]
b_idx = ops[op_idx, 3]
c_idx = ops[op_idx, 4]
d_idx = ops[op_idx, 5]
param = param[st_idx]
# >>> same code as wave_eval_cpu (except rand_gauss_*pu()-call) >>>
overflows = int(0)
_seed = (seed << 4) + (z_idx << 20) + (st_idx << 1)
a_mem = vat[a_idx, 0]
b_mem = vat[b_idx, 0]
c_mem = vat[c_idx, 0]
d_mem = vat[d_idx, 0]
z_mem, z_cap, _ = vat[z_idx]
a_cur = int(0)
b_cur = int(0)
c_cur = int(0)
d_cur = int(0)
z_cur = lut & 1
if z_cur == 1:
cbuf[z_mem, st_idx] = TMIN
a = cbuf[a_mem, st_idx] + line_times[a_idx, 0, z_cur] * rand_gauss_gpu(_seed ^ a_mem ^ z_cur, sd) * param[0]
if int(param[1]) == a_idx: a += param[2+z_cur]
b = cbuf[b_mem, st_idx] + line_times[b_idx, 0, z_cur] * rand_gauss_gpu(_seed ^ b_mem ^ z_cur, sd) * param[0]
if int(param[1]) == b_idx: b += param[2+z_cur]
c = cbuf[c_mem, st_idx] + line_times[c_idx, 0, z_cur] * rand_gauss_gpu(_seed ^ c_mem ^ z_cur, sd) * param[0]
if int(param[1]) == c_idx: c += param[2+z_cur]
d = cbuf[d_mem, st_idx] + line_times[d_idx, 0, z_cur] * rand_gauss_gpu(_seed ^ d_mem ^ z_cur, sd) * param[0]
if int(param[1]) == d_idx: d += param[2+z_cur]
previous_t = TMIN
current_t = min(a, b, c, d)
inputs = int(0)
while current_t < TMAX:
z_val = z_cur & 1
if a == current_t:
a_cur += 1
a = cbuf[a_mem + a_cur, st_idx]
a += line_times[a_idx, 0, z_val ^ 1] * rand_gauss_gpu(_seed ^ a_mem ^ z_val ^ 1, sd) * param[0]
thresh = line_times[a_idx, 1, z_val] * rand_gauss_gpu(_seed ^ a_mem ^ z_val, sd) * param[0]
if int(param[1]) == a_idx:
a += param[2+(z_val^1)]
thresh += param[2+z_val]
inputs ^= 1
next_t = a
elif b == current_t:
b_cur += 1
b = cbuf[b_mem + b_cur, st_idx]
b += line_times[b_idx, 0, z_val ^ 1] * rand_gauss_gpu(_seed ^ b_mem ^ z_val ^ 1, sd) * param[0]
thresh = line_times[b_idx, 1, z_val] * rand_gauss_gpu(_seed ^ b_mem ^ z_val, sd) * param[0]
if int(param[1]) == b_idx:
b += param[2+(z_val^1)]
thresh += param[2+z_val]
inputs ^= 2
next_t = b
elif c == current_t:
c_cur += 1
c = cbuf[c_mem + c_cur, st_idx]
c += line_times[c_idx, 0, z_val ^ 1] * rand_gauss_gpu(_seed ^ c_mem ^ z_val ^ 1, sd) * param[0]
thresh = line_times[c_idx, 1, z_val] * rand_gauss_gpu(_seed ^ c_mem ^ z_val, sd) * param[0]
if int(param[1]) == c_idx:
c += param[2+(z_val^1)]
thresh += param[2+z_val]
inputs ^= 4
next_t = c
else:
d_cur += 1
d = cbuf[d_mem + d_cur, st_idx]
d += line_times[d_idx, 0, z_val ^ 1] * rand_gauss_gpu(_seed ^ d_mem ^ z_val ^ 1, sd) * param[0]
thresh = line_times[d_idx, 1, z_val] * rand_gauss_gpu(_seed ^ d_mem ^ z_val, sd) * param[0]
if int(param[1]) == d_idx:
d += param[2+(z_val^1)]
thresh += param[2+z_val]
inputs ^= 8
next_t = d
if (z_cur & 1) != ((lut >> inputs) & 1):
# we generate a toggle in z_mem, if:
# ( it is the first toggle in z_mem OR
# following toggle is earlier OR
# pulse is wide enough ) AND enough space in z_mem.
if z_cur == 0 or next_t < current_t or (current_t - previous_t) > thresh:
if z_cur < (z_cap - 1):
cbuf[z_mem + z_cur, st_idx] = current_t
previous_t = current_t
z_cur += 1
else:
overflows += 1
previous_t = cbuf[z_mem + z_cur - 1, st_idx]
z_cur -= 1
else:
z_cur -= 1
previous_t = cbuf[z_mem + z_cur - 1, st_idx] if z_cur > 0 else TMIN
current_t = min(a, b, c, d)
# generate overflow flag or propagate from input
cbuf[z_mem + z_cur, st_idx] = TMAX_OVL if overflows > 0 else max(a, b, c, d)

961
src/kyupy/wave_sim_old.py

@ -0,0 +1,961 @@
"""High-throughput combinational logic timing simulators.
These simulators work similarly to :py:class:`~kyupy.logic_sim.LogicSim`.
They propagate values through the combinational circuit from (pseudo) primary inputs to (pseudo) primary outputs.
Instead of propagating logic values, these simulators propagate signal histories (waveforms).
They are designed to run many simulations in parallel and while their latencies are quite high, they can achieve
high throughput.
The simulators are not event-based and are not capable of simulating sequential circuits directly.
Two simulators are available: :py:class:`WaveSim` runs on the CPU, and the derived class
:py:class:`WaveSimCuda` runs on the GPU.
"""
import math
from bisect import bisect, insort_left
import numpy as np
from . import numba, cuda, hr_bytes
TMAX = np.float32(2 ** 127)
"""A large 32-bit floating point value used to mark the end of a waveform."""
TMAX_OVL = np.float32(1.1 * 2 ** 127)
"""A large 32-bit floating point value used to mark the end of a waveform that
may be incomplete due to an overflow."""
TMIN = np.float32(-2 ** 127)
"""A large negative 32-bit floating point value used at the beginning of waveforms that start with logic-1."""
class Heap:
def __init__(self):
self.chunks = dict() # map start location to chunk size
self.released = list() # chunks that were released
self.current_size = 0
self.max_size = 0
def alloc(self, size):
for idx, loc in enumerate(self.released):
if self.chunks[loc] == size:
del self.released[idx]
return loc
if self.chunks[loc] > size: # split chunk
chunksize = self.chunks[loc]
self.chunks[loc] = size
self.chunks[loc + size] = chunksize - size
self.released[idx] = loc + size # move released pointer: loc -> loc+size
return loc
# no previously released chunk; make new one
loc = self.current_size
self.chunks[loc] = size
self.current_size += size
self.max_size = max(self.max_size, self.current_size)
return loc
def free(self, loc):
size = self.chunks[loc]
if loc + size == self.current_size: # end of managed area, remove chunk
del self.chunks[loc]
self.current_size -= size
# check and remove prev chunk if free
if len(self.released) > 0:
prev = self.released[-1]
if prev + self.chunks[prev] == self.current_size:
chunksize = self.chunks[prev]
del self.chunks[prev]
del self.released[-1]
self.current_size -= chunksize
return
released_idx = bisect(self.released, loc)
if released_idx < len(self.released) and loc + size == self.released[released_idx]: # next chunk is free, merge
chunksize = size + self.chunks[loc + size]
del self.chunks[loc + size]
self.chunks[loc] = chunksize
size = self.chunks[loc]
self.released[released_idx] = loc
else:
insort_left(self.released, loc) # put in a new release
if released_idx > 0: # check if previous chunk is free
prev = self.released[released_idx - 1]
if prev + self.chunks[prev] == loc: # previous chunk is adjacent to freed one, merge
chunksize = size + self.chunks[prev]
del self.chunks[loc]
self.chunks[prev] = chunksize
del self.released[released_idx]
def __repr__(self):
r = []
for loc in sorted(self.chunks.keys()):
size = self.chunks[loc]
released_idx = bisect(self.released, loc)
is_released = released_idx > 0 and len(self.released) > 0 and self.released[released_idx - 1] == loc
r.append(f'{loc:5d}: {"free" if is_released else "used"} {size}')
return "\n".join(r)
class WaveSim:
"""A waveform-based combinational logic timing simulator running on CPU.
:param circuit: The circuit to simulate.
:param timing: The timing annotation of the circuit (see :py:func:`kyupy.sdf.DelayFile.annotation` for details)
:param sims: The number of parallel simulations.
:param wavecaps: The number of floats available in each waveform. Waveforms are encoding the signal switching
history by storing transition times. The waveform capacity roughly corresponds to the number of transitions
that can be stored. A capacity of ``n`` can store at least ``n-2`` transitions. If more transitions are
generated during simulation, the latest glitch is removed (freeing up two transition times) and an overflow
flag is set. If an integer is given, all waveforms are set to that same capacity. With an array of length
``len(circuit.lines)`` the capacity can be controlled for each intermediate waveform individually.
:param strip_forks: If enabled, the simulator will not evaluate fork nodes explicitly. This saves simulation time
by reducing the number of nodes to simulate, but (interconnect) delay annotations of lines read by fork nodes
are ignored.
:param keep_waveforms: If disabled, memory of intermediate signal waveforms will be re-used. This greatly reduces
memory footprint, but intermediate signal waveforms become unaccessible after a propagation.
"""
def __init__(self, circuit, timing, sims=8, wavecaps=16, strip_forks=False, keep_waveforms=True):
self.circuit = circuit
self.sims = sims
self.overflows = 0
self.interface = list(circuit.io_nodes) + [n for n in circuit.nodes if 'dff' in n.kind.lower()]
self.lst_eat_valid = False
self.cdata = np.zeros((len(self.interface), sims, 7), dtype='float32')
self.sdata = np.zeros((sims, 4), dtype='float32')
self.sdata[...,0] = 1.0
if isinstance(wavecaps, int):
wavecaps = [wavecaps] * len(circuit.lines)
intf_wavecap = 4 # sufficient for storing only 1 transition.
# indices for state allocation table (sat)
self.zero_idx = len(circuit.lines)
self.tmp_idx = self.zero_idx + 1
self.ppi_offset = self.tmp_idx + 1
self.ppo_offset = self.ppi_offset + len(self.interface)
self.sat_length = self.ppo_offset + len(self.interface)
# translate circuit structure into self.ops
ops = []
interface_dict = dict((n, i) for i, n in enumerate(self.interface))
for n in circuit.topological_order():
if n in interface_dict:
inp_idx = self.ppi_offset + interface_dict[n]
if len(n.outs) > 0 and n.outs[0] is not None: # first output of a PI/PPI
ops.append((0b1010, n.outs[0].index, inp_idx, self.zero_idx))
if 'dff' in n.kind.lower(): # second output of DFF is inverted
if len(n.outs) > 1 and n.outs[1] is not None:
ops.append((0b0101, n.outs[1].index, inp_idx, self.zero_idx))
else: # if not DFF, no output is inverted.
for o_line in n.outs[1:]:
if o_line is not None:
ops.append((0b1010, o_line.index, inp_idx, self.zero_idx))
else: # regular node, not PI/PPI or PO/PPO
o0_idx = n.outs[0].index if len(n.outs) > 0 and n.outs[0] is not None else self.tmp_idx
i0_idx = n.ins[0].index if len(n.ins) > 0 and n.ins[0] is not None else self.zero_idx
i1_idx = n.ins[1].index if len(n.ins) > 1 and n.ins[1] is not None else self.zero_idx
kind = n.kind.lower()
if kind == '__fork__':
if not strip_forks:
for o_line in n.outs:
if o_line is not None:
ops.append((0b1010, o_line.index, i0_idx, i1_idx))
elif kind.startswith('nand'):
ops.append((0b0111, o0_idx, i0_idx, i1_idx))
elif kind.startswith('nor'):
ops.append((0b0001, o0_idx, i0_idx, i1_idx))
elif kind.startswith('and'):
ops.append((0b1000, o0_idx, i0_idx, i1_idx))
elif kind.startswith('or'):
ops.append((0b1110, o0_idx, i0_idx, i1_idx))
elif kind.startswith('xor'):
ops.append((0b0110, o0_idx, i0_idx, i1_idx))
elif kind.startswith('xnor'):
ops.append((0b1001, o0_idx, i0_idx, i1_idx))
elif kind.startswith('not') or kind.startswith('inv') or kind.startswith('ibuf'):
ops.append((0b0101, o0_idx, i0_idx, i1_idx))
elif kind.startswith('buf') or kind.startswith('nbuf'):
ops.append((0b1010, o0_idx, i0_idx, i1_idx))
elif kind.startswith('__const1__') or kind.startswith('tieh'):
ops.append((0b0101, o0_idx, i0_idx, i1_idx))
elif kind.startswith('__const0__') or kind.startswith('tiel'):
ops.append((0b1010, o0_idx, i0_idx, i1_idx))
else:
print('unknown gate type', kind)
self.ops = np.asarray(ops, dtype='int32')
# create a map from fanout lines to stem lines for fork stripping
stems = np.zeros(self.sat_length, dtype='int32') - 1 # default to -1: 'no fanout line'
if strip_forks:
for f in circuit.forks.values():
prev_line = f.ins[0]
while prev_line.driver.kind == '__fork__':
prev_line = prev_line.driver.ins[0]
stem_idx = prev_line.index
for ol in f.outs:
stems[ol] = stem_idx
# calculate level (distance from PI/PPI) and reference count for each line
levels = np.zeros(self.sat_length, dtype='int32')
ref_count = np.zeros(self.sat_length, dtype='int32')
level_starts = [0]
current_level = 1
for i, op in enumerate(self.ops):
# if we fork-strip, always take the stems for determining fan-in level
i0_idx = stems[op[2]] if stems[op[2]] >= 0 else op[2]
i1_idx = stems[op[3]] if stems[op[3]] >= 0 else op[3]
if levels[i0_idx] >= current_level or levels[i1_idx] >= current_level:
current_level += 1
level_starts.append(i)
levels[op[1]] = current_level # set level of the output line
ref_count[i0_idx] += 1
ref_count[i1_idx] += 1
self.level_starts = np.asarray(level_starts, dtype='int32')
self.level_stops = np.asarray(level_starts[1:] + [len(self.ops)], dtype='int32')
# state allocation table. maps line and interface indices to self.state memory locations
self.sat = np.zeros((self.sat_length, 3), dtype='int')
self.sat[:, 0] = -1
h = Heap()
# allocate and keep memory for special fields
self.sat[self.zero_idx] = h.alloc(intf_wavecap), intf_wavecap, 0
self.sat[self.tmp_idx] = h.alloc(intf_wavecap), intf_wavecap, 0
ref_count[self.zero_idx] += 1
ref_count[self.tmp_idx] += 1
# allocate and keep memory for PI/PPI, keep memory for PO/PPO (allocated later)
for i, n in enumerate(self.interface):
if len(n.outs) > 0:
self.sat[self.ppi_offset + i] = h.alloc(intf_wavecap), intf_wavecap, 0
ref_count[self.ppi_offset + i] += 1
if len(n.ins) > 0:
i0_idx = stems[n.ins[0]] if stems[n.ins[0]] >= 0 else n.ins[0]
ref_count[i0_idx] += 1
# allocate memory for the rest of the circuit
for op_start, op_stop in zip(self.level_starts, self.level_stops):
free_list = []
for op in self.ops[op_start:op_stop]:
# if we fork-strip, always take the stems
i0_idx = stems[op[2]] if stems[op[2]] >= 0 else op[2]
i1_idx = stems[op[3]] if stems[op[3]] >= 0 else op[3]
ref_count[i0_idx] -= 1
ref_count[i1_idx] -= 1
if ref_count[i0_idx] <= 0: free_list.append(self.sat[i0_idx, 0])
if ref_count[i1_idx] <= 0: free_list.append(self.sat[i1_idx, 0])
o_idx = op[1]
cap = wavecaps[o_idx]
self.sat[o_idx] = h.alloc(cap), cap, 0
if not keep_waveforms:
for loc in free_list:
h.free(loc)
# copy memory location and capacity from stems to fanout lines
for lidx, stem in enumerate(stems):
if stem >= 0: # if at a fanout line
self.sat[lidx] = self.sat[stem]
# copy memory location to PO/PPO area
for i, n in enumerate(self.interface):
if len(n.ins) > 0:
self.sat[self.ppo_offset + i] = self.sat[n.ins[0]]
# pad timing
self.timing = np.zeros((self.sat_length, 2, 2))
self.timing[:len(timing)] = timing
# allocate self.state
self.state = np.zeros((h.max_size, sims), dtype='float32') + TMAX
m1 = np.array([2 ** x for x in range(7, -1, -1)], dtype='uint8')
m0 = ~m1
self.mask = np.rollaxis(np.vstack((m0, m1)), 1)
def __repr__(self):
total_mem = self.state.nbytes + self.sat.nbytes + self.ops.nbytes + self.cdata.nbytes
return f'<WaveSim {self.circuit.name} sims={self.sims} ops={len(self.ops)} ' + \
f'levels={len(self.level_starts)} mem={hr_bytes(total_mem)}>'
def get_line_delay(self, line, polarity):
"""Returns the current delay of the given ``line`` and ``polarity`` in the simulation model."""
return self.timing[line, 0, polarity]
def set_line_delay(self, line, polarity, delay):
"""Sets a new ``delay`` for the given ``line`` and ``polarity`` in the simulation model."""
self.timing[line, 0, polarity] = delay
def assign(self, vectors, time=0.0, offset=0):
"""Assigns new values to the primary inputs and state-elements.
:param vectors: The values to assign preferably in 8-valued logic. The values are converted to
appropriate waveforms with or one transition (``RISE``, ``FALL``) no transitions
(``ZERO``, ``ONE``, and others).
:type vectors: :py:class:`~kyupy.logic.BPArray`
:param time: The transition time of the generated waveforms.
:param offset: The offset into the vector set. The vector assigned to the first simulator is
``vectors[offset]``.
"""
nvectors = min(len(vectors) - offset, self.sims)
for i in range(len(self.interface)):
ppi_loc = self.sat[self.ppi_offset + i, 0]
if ppi_loc < 0: continue
for p in range(nvectors):
vector = p + offset
a = vectors.data[i, :, vector // 8]
m = self.mask[vector % 8]
toggle = 0
if len(a) <= 2:
if a[0] & m[1]:
self.state[ppi_loc, p] = TMIN
toggle += 1
else:
if a[1] & m[1]:
self.state[ppi_loc, p] = TMIN
toggle += 1
if (a[2] & m[1]) and ((a[0] & m[1]) != (a[1] & m[1])):
self.state[ppi_loc + toggle, p] = time
toggle += 1
self.state[ppi_loc + toggle, p] = TMAX
def propagate(self, sims=None, sd=0.0, seed=1):
"""Propagates all waveforms from the (pseudo) primary inputs to the (pseudo) primary outputs.
:param sims: Number of parallel simulations to execute. If None, all available simulations are performed.
:param sd: Standard deviation for injection of random delay variation. Active, if value is positive.
:param seed: Random seed for delay variations.
"""
sims = min(sims or self.sims, self.sims)
for op_start, op_stop in zip(self.level_starts, self.level_stops):
self.overflows += level_eval(self.ops, op_start, op_stop, self.state, self.sat, 0, sims,
self.timing, self.sdata, sd, seed)
self.lst_eat_valid = False
def wave(self, line, vector):
# """Returns the desired waveform from the simulation state. Only valid, if simulator was
# instantiated with ``keep_waveforms=True``."""
if line < 0:
return [TMAX]
mem, wcap, _ = self.sat[line]
if mem < 0:
return [TMAX]
return self.state[mem:mem + wcap, vector]
def wave_ppi(self, i, vector):
return self.wave(self.ppi_offset + i, vector)
def wave_ppo(self, o, vector):
return self.wave(self.ppo_offset + o, vector)
def capture(self, time=TMAX, sd=0.0, seed=1, cdata=None, offset=0):
"""Simulates a capture operation at all state-elements and primary outputs.
The capture analyzes the propagated waveforms at and around the given capture time and returns
various results for each capture operation.
:param time: The desired capture time. By default, a capture of the settled value is performed.
:param sd: A standard deviation for uncertainty in the actual capture time.
:param seed: The random seed for a capture with uncertainty.
:param cdata: An array to copy capture data into (optional). See the return value for details.
:param offset: An offset into the supplied capture data array.
:return: The capture data as numpy array.
The 3-dimensional capture data array contains for each interface node (axis 0),
and each test (axis 1), seven values:
0. Probability of capturing a 1 at the given capture time (same as next value, if no
standard deviation given).
1. A capture value decided by random sampling according to above probability and given seed.
2. The final value (assume a very late capture time).
3. True, if there was a premature capture (capture error), i.e. final value is different
from captured value.
4. Earliest arrival time. The time at which the output transitioned from its initial value.
5. Latest stabilization time. The time at which the output transitioned to its final value.
6. Overflow indicator. If non-zero, some signals in the input cone of this output had more
transitions than specified in ``wavecaps``. Some transitions have been discarded, the
final values in the waveforms are still valid.
"""
for i, node in enumerate(self.interface):
if len(node.ins) == 0: continue
for p in range(self.sims):
self.cdata[i, p] = self.capture_wave(self.ppo_offset + i, p, time, sd, seed)
if cdata is not None:
assert offset < cdata.shape[1]
cap_dim = min(cdata.shape[1] - offset, self.sims)
cdata[:, offset:cap_dim + offset] = self.cdata[:, 0:cap_dim]
self.lst_eat_valid = True
return self.cdata
def reassign(self, time=0.0):
"""Re-assigns the last capture to the appropriate pseudo-primary inputs. Generates a new set of
waveforms at the PPIs that start with the previous final value of that PPI, and transitions at the
given time to the value captured in a previous simulation. :py:func:`~WaveSim.capture` must be called
prior to this function. The final value of each PPI is taken from the randomly sampled concrete logic
values in the capture data.
:param time: The transition time at the inputs (usually 0.0).
"""
for i in range(len(self.interface)):
ppi_loc = self.sat[self.ppi_offset + i, 0]
ppo_loc = self.sat[self.ppo_offset + i, 0]
if ppi_loc < 0 or ppo_loc < 0: continue
for sidx in range(self.sims):
ival = self.val(self.ppi_offset + i, sidx, TMAX) > 0.5
oval = self.cdata[i, sidx, 1] > 0.5
toggle = 0
if ival:
self.state[ppi_loc, sidx] = TMIN
toggle += 1
if ival != oval:
self.state[ppi_loc + toggle, sidx] = time
toggle += 1
self.state[ppi_loc + toggle, sidx] = TMAX
def eat(self, line, vector):
eat = TMAX
for t in self.wave(line, vector):
if t >= TMAX: break
if t <= TMIN: continue
eat = min(eat, t)
return eat
def lst(self, line, vector):
lst = TMIN
for t in self.wave(line, vector):
if t >= TMAX: break
if t <= TMIN: continue
lst = max(lst, t)
return lst
def lst_ppo(self, o, vector):
if not self.lst_eat_valid:
self.capture()
return self.cdata[o, vector, 5]
def toggles(self, line, vector):
tog = 0
for t in self.wave(line, vector):
if t >= TMAX: break
if t <= TMIN: continue
tog += 1
return tog
def _vals(self, idx, vector, times, sd=0.0):
s_sqrt2 = sd * math.sqrt(2)
m = 0.5
accs = [0.0] * len(times)
values = [0] * len(times)
for t in self.wave(idx, vector):
if t >= TMAX: break
for idx, time in enumerate(times):
if t < time:
values[idx] = values[idx] ^ 1
m = -m
if t <= TMIN: continue
if s_sqrt2 > 0:
for idx, time in enumerate(times):
accs[idx] += m * (1 + math.erf((t - time) / s_sqrt2))
if (m < 0) and (s_sqrt2 > 0):
for idx, time in enumerate(times):
accs[idx] += 1
if s_sqrt2 == 0:
return values
return accs
def vals(self, line, vector, times, sd=0):
return self._vals(line, vector, times, sd)
def val(self, line, vector, time=TMAX, sd=0):
return self.capture_wave(line, vector, time, sd)[0]
def vals_ppo(self, o, vector, times, sd=0):
return self._vals(self.ppo_offset + o, vector, times, sd)
def val_ppo(self, o, vector, time=TMAX, sd=0):
if not self.lst_eat_valid:
self.capture(time, sd)
return self.cdata[o, vector, 0]
def capture_wave(self, line, vector, time=TMAX, sd=0.0, seed=1):
s_sqrt2 = sd * math.sqrt(2)
m = 0.5
acc = 0.0
eat = TMAX
lst = TMIN
tog = 0
ovl = 0
val = int(0)
final = int(0)
for t in self.wave(line, vector):
if t >= TMAX:
if t == TMAX_OVL:
ovl = 1
break
m = -m
final ^= 1
if t < time:
val ^= 1
if t <= TMIN: continue
if s_sqrt2 > 0:
acc += m * (1 + math.erf((t - time) / s_sqrt2))
eat = min(eat, t)
lst = max(lst, t)
tog += 1
if s_sqrt2 > 0:
if m < 0:
acc += 1
if acc >= 0.99:
val = 1
elif acc > 0.01:
seed = (seed << 4) + (vector << 20) + (line-self.ppo_offset << 1)
seed = int(0xDEECE66D) * seed + 0xB
seed = int(0xDEECE66D) * seed + 0xB
rnd = float((seed >> 8) & 0xffffff) / float(1 << 24)
val = rnd < acc
else:
val = 0
else:
acc = val
return acc, val, final, (val != final), eat, lst, ovl
@numba.njit
def level_eval(ops, op_start, op_stop, state, sat, st_start, st_stop, line_times, sdata, sd, seed):
overflows = 0
for op_idx in range(op_start, op_stop):
op = ops[op_idx]
for st_idx in range(st_start, st_stop):
overflows += wave_eval(op, state, sat, st_idx, line_times, sdata[st_idx], sd, seed)
return overflows
@numba.njit
def rand_gauss(seed, sd):
clamp = 0.5
if sd <= 0.0:
return 1.0
while True:
x = -6.0
for _ in range(12):
seed = int(0xDEECE66D) * seed + 0xB
x += float((seed >> 8) & 0xffffff) / float(1 << 24)
x *= sd
if abs(x) <= clamp:
break
return x + 1.0
@numba.njit
def wave_eval(op, state, sat, st_idx, line_times, sdata, sd=0.0, seed=0):
lut, z_idx, a_idx, b_idx = op
overflows = int(0)
_seed = (seed << 4) + (z_idx << 20) + (st_idx << 1)
a_mem = sat[a_idx, 0]
b_mem = sat[b_idx, 0]
z_mem, z_cap, _ = sat[z_idx]
a_cur = int(0)
b_cur = int(0)
z_cur = lut & 1
if z_cur == 1:
state[z_mem, st_idx] = TMIN
a = state[a_mem, st_idx] + line_times[a_idx, 0, z_cur] * rand_gauss(_seed ^ a_mem ^ z_cur, sd) * sdata[0]
if int(sdata[1]) == a_idx: a += sdata[2+z_cur]
b = state[b_mem, st_idx] + line_times[b_idx, 0, z_cur] * rand_gauss(_seed ^ b_mem ^ z_cur, sd) * sdata[0]
if int(sdata[1]) == b_idx: b += sdata[2+z_cur]
previous_t = TMIN
current_t = min(a, b)
inputs = int(0)
while current_t < TMAX:
z_val = z_cur & 1
if b < a:
b_cur += 1
b = state[b_mem + b_cur, st_idx]
b += line_times[b_idx, 0, z_val ^ 1] * rand_gauss(_seed ^ b_mem ^ z_val ^ 1, sd) * sdata[0]
thresh = line_times[b_idx, 1, z_val] * rand_gauss(_seed ^ b_mem ^ z_val, sd) * sdata[0]
if int(sdata[1]) == b_idx:
b += sdata[2+(z_val^1)]
thresh += sdata[2+z_val]
inputs ^= 2
next_t = b
else:
a_cur += 1
a = state[a_mem + a_cur, st_idx]
a += line_times[a_idx, 0, z_val ^ 1] * rand_gauss(_seed ^ a_mem ^ z_val ^ 1, sd) * sdata[0]
thresh = line_times[a_idx, 1, z_val] * rand_gauss(_seed ^ a_mem ^ z_val, sd) * sdata[0]
if int(sdata[1]) == a_idx:
a += sdata[2+(z_val^1)]
thresh += sdata[2+z_val]
inputs ^= 1
next_t = a
if (z_cur & 1) != ((lut >> inputs) & 1):
# we generate a toggle in z_mem, if:
# ( it is the first toggle in z_mem OR
# following toggle is earlier OR
# pulse is wide enough ) AND enough space in z_mem.
if z_cur == 0 or next_t < current_t or (current_t - previous_t) > thresh:
if z_cur < (z_cap - 1):
state[z_mem + z_cur, st_idx] = current_t
previous_t = current_t
z_cur += 1
else:
overflows += 1
previous_t = state[z_mem + z_cur - 1, st_idx]
z_cur -= 1
else:
z_cur -= 1
if z_cur > 0:
previous_t = state[z_mem + z_cur - 1, st_idx]
else:
previous_t = TMIN
current_t = min(a, b)
if overflows > 0:
state[z_mem + z_cur, st_idx] = TMAX_OVL
else:
state[z_mem + z_cur, st_idx] = a if a > b else b # propagate overflow flags by storing biggest TMAX from input
return overflows
class WaveSimCuda(WaveSim):
"""A GPU-accelerated waveform-based combinational logic timing simulator.
The API is the same as for :py:class:`WaveSim`.
All internal memories are mirrored into GPU memory upon construction.
Some operations like access to single waveforms can involve large communication overheads.
"""
def __init__(self, circuit, timing, sims=8, wavecaps=16, strip_forks=False, keep_waveforms=True):
super().__init__(circuit, timing, sims, wavecaps, strip_forks, keep_waveforms)
self.tdata = np.zeros((len(self.interface), 3, (sims - 1) // 8 + 1), dtype='uint8')
self.d_state = cuda.to_device(self.state)
self.d_sat = cuda.to_device(self.sat)
self.d_ops = cuda.to_device(self.ops)
self.d_timing = cuda.to_device(self.timing)
self.d_tdata = cuda.to_device(self.tdata)
self.d_cdata = cuda.to_device(self.cdata)
self.d_sdata = cuda.to_device(self.sdata)
self._block_dim = (32, 16)
def __repr__(self):
total_mem = self.state.nbytes + self.sat.nbytes + self.ops.nbytes + self.timing.nbytes + \
self.tdata.nbytes + self.cdata.nbytes
return f'<WaveSimCuda {self.circuit.name} sims={self.sims} ops={len(self.ops)} ' + \
f'levels={len(self.level_starts)} mem={hr_bytes(total_mem)}>'
def get_line_delay(self, line, polarity):
return self.d_timing[line, 0, polarity]
def set_line_delay(self, line, polarity, delay):
self.d_timing[line, 0, polarity] = delay
def sdata_to_device(self):
cuda.to_device(self.sdata, to=self.d_sdata)
def assign(self, vectors, time=0.0, offset=0):
assert (offset % 8) == 0
byte_offset = offset // 8
assert byte_offset < vectors.data.shape[-1]
pdim = min(vectors.data.shape[-1] - byte_offset, self.tdata.shape[-1])
self.tdata[..., 0:pdim] = vectors.data[..., byte_offset:pdim + byte_offset]
if vectors.m == 2:
self.tdata[:, 2, 0:pdim] = 0
cuda.to_device(self.tdata, to=self.d_tdata)
grid_dim = self._grid_dim(self.sims, len(self.interface))
assign_kernel[grid_dim, self._block_dim](self.d_state, self.d_sat, self.ppi_offset,
len(self.interface), self.d_tdata, time)
def _grid_dim(self, x, y):
gx = math.ceil(x / self._block_dim[0])
gy = math.ceil(y / self._block_dim[1])
return gx, gy
def propagate(self, sims=None, sd=0.0, seed=1):
sims = min(sims or self.sims, self.sims)
for op_start, op_stop in zip(self.level_starts, self.level_stops):
grid_dim = self._grid_dim(sims, op_stop - op_start)
wave_kernel[grid_dim, self._block_dim](self.d_ops, op_start, op_stop, self.d_state, self.sat, int(0),
sims, self.d_timing, self.d_sdata, sd, seed)
cuda.synchronize()
self.lst_eat_valid = False
def wave(self, line, vector):
if line < 0:
return [TMAX]
mem, wcap, _ = self.sat[line]
if mem < 0:
return [TMAX]
return self.d_state[mem:mem + wcap, vector]
def capture(self, time=TMAX, sd=0, seed=1, cdata=None, offset=0):
grid_dim = self._grid_dim(self.sims, len(self.interface))
capture_kernel[grid_dim, self._block_dim](self.d_state, self.d_sat, self.ppo_offset,
self.d_cdata, time, sd * math.sqrt(2), seed)
self.cdata[...] = self.d_cdata
if cdata is not None:
assert offset < cdata.shape[1]
cap_dim = min(cdata.shape[1] - offset, self.sims)
cdata[:, offset:cap_dim + offset] = self.cdata[:, 0:cap_dim]
self.lst_eat_valid = True
return self.cdata
def reassign(self, time=0.0):
grid_dim = self._grid_dim(self.sims, len(self.interface))
reassign_kernel[grid_dim, self._block_dim](self.d_state, self.d_sat, self.ppi_offset, self.ppo_offset,
self.d_cdata, time)
cuda.synchronize()
def wavecaps(self):
gx = math.ceil(len(self.circuit.lines) / 512)
wavecaps_kernel[gx, 512](self.d_state, self.d_sat, self.sims)
self.sat[...] = self.d_sat
return self.sat[..., 2]
@cuda.jit()
def wavecaps_kernel(state, sat, sims):
idx = cuda.grid(1)
if idx >= len(sat): return
lidx, lcap, _ = sat[idx]
if lidx < 0: return
wcap = 0
for sidx in range(sims):
for tidx in range(lcap):
t = state[lidx + tidx, sidx]
if tidx > wcap:
wcap = tidx
if t >= TMAX: break
sat[idx, 2] = wcap + 1
@cuda.jit()
def reassign_kernel(state, sat, ppi_offset, ppo_offset, cdata, ppi_time):
vector, y = cuda.grid(2)
if vector >= state.shape[-1]: return
if ppo_offset + y >= len(sat): return
ppo, _, _ = sat[ppo_offset + y]
ppi, ppi_cap, _ = sat[ppi_offset + y]
if ppo < 0: return
if ppi < 0: return
ppo_val = int(cdata[y, vector, 1])
ppi_val = int(0)
for tidx in range(ppi_cap):
t = state[ppi + tidx, vector]
if t >= TMAX: break
ppi_val ^= 1
# make new waveform at PPI
toggle = 0
if ppi_val:
state[ppi + toggle, vector] = TMIN
toggle += 1
if ppi_val != ppo_val:
state[ppi + toggle, vector] = ppi_time
toggle += 1
state[ppi + toggle, vector] = TMAX
@cuda.jit()
def capture_kernel(state, sat, ppo_offset, cdata, time, s_sqrt2, seed):
x, y = cuda.grid(2)
if ppo_offset + y >= len(sat): return
line, tdim, _ = sat[ppo_offset + y]
if line < 0: return
if x >= state.shape[-1]: return
vector = x
m = 0.5
acc = 0.0
eat = TMAX
lst = TMIN
tog = 0
ovl = 0
val = int(0)
final = int(0)
for tidx in range(tdim):
t = state[line + tidx, vector]
if t >= TMAX:
if t == TMAX_OVL:
ovl = 1
break
m = -m
final ^= 1
if t < time:
val ^= 1
if t <= TMIN: continue
if s_sqrt2 > 0:
acc += m * (1 + math.erf((t - time) / s_sqrt2))
eat = min(eat, t)
lst = max(lst, t)
tog += 1
if s_sqrt2 > 0:
if m < 0:
acc += 1
if acc >= 0.99:
val = 1
elif acc > 0.01:
seed = (seed << 4) + (vector << 20) + (y << 1)
seed = int(0xDEECE66D) * seed + 0xB
seed = int(0xDEECE66D) * seed + 0xB
rnd = float((seed >> 8) & 0xffffff) / float(1 << 24)
val = rnd < acc
else:
val = 0
else:
acc = val
cdata[y, vector, 0] = acc
cdata[y, vector, 1] = val
cdata[y, vector, 2] = final
cdata[y, vector, 3] = (val != final)
cdata[y, vector, 4] = eat
cdata[y, vector, 5] = lst
cdata[y, vector, 6] = ovl
@cuda.jit()
def assign_kernel(state, sat, ppi_offset, intf_len, tdata, time):
x, y = cuda.grid(2)
if y >= intf_len: return
line = sat[ppi_offset + y, 0]
if line < 0: return
sdim = state.shape[-1]
if x >= sdim: return
vector = x
a0 = tdata[y, 0, vector // 8]
a1 = tdata[y, 1, vector // 8]
a2 = tdata[y, 2, vector // 8]
m = np.uint8(1 << (7 - (vector % 8)))
toggle = 0
if a1 & m:
state[line + toggle, x] = TMIN
toggle += 1
if (a2 & m) and ((a0 & m) != (a1 & m)):
state[line + toggle, x] = time
toggle += 1
state[line + toggle, x] = TMAX
@cuda.jit(device=True)
def rand_gauss_dev(seed, sd):
clamp = 0.5
if sd <= 0.0:
return 1.0
while True:
x = -6.0
for _ in range(12):
seed = int(0xDEECE66D) * seed + 0xB
x += float((seed >> 8) & 0xffffff) / float(1 << 24)
x *= sd
if abs(x) <= clamp:
break
return x + 1.0
@cuda.jit()
def wave_kernel(ops, op_start, op_stop, state, sat, st_start, st_stop, line_times, sdata, sd, seed):
x, y = cuda.grid(2)
st_idx = st_start + x
op_idx = op_start + y
if st_idx >= st_stop: return
if op_idx >= op_stop: return
lut = ops[op_idx, 0]
z_idx = ops[op_idx, 1]
a_idx = ops[op_idx, 2]
b_idx = ops[op_idx, 3]
overflows = int(0)
sdata = sdata[st_idx]
_seed = (seed << 4) + (z_idx << 20) + (st_idx << 1)
a_mem = sat[a_idx, 0]
b_mem = sat[b_idx, 0]
z_mem, z_cap, _ = sat[z_idx]
a_cur = int(0)
b_cur = int(0)
z_cur = lut & 1
if z_cur == 1:
state[z_mem, st_idx] = TMIN
a = state[a_mem, st_idx] + line_times[a_idx, 0, z_cur] * rand_gauss_dev(_seed ^ a_mem ^ z_cur, sd) * sdata[0]
if int(sdata[1]) == a_idx: a += sdata[2+z_cur]
b = state[b_mem, st_idx] + line_times[b_idx, 0, z_cur] * rand_gauss_dev(_seed ^ b_mem ^ z_cur, sd) * sdata[0]
if int(sdata[1]) == b_idx: b += sdata[2+z_cur]
previous_t = TMIN
current_t = min(a, b)
inputs = int(0)
while current_t < TMAX:
z_val = z_cur & 1
if b < a:
b_cur += 1
b = state[b_mem + b_cur, st_idx]
b += line_times[b_idx, 0, z_val ^ 1] * rand_gauss_dev(_seed ^ b_mem ^ z_val ^ 1, sd) * sdata[0]
thresh = line_times[b_idx, 1, z_val] * rand_gauss_dev(_seed ^ b_mem ^ z_val, sd) * sdata[0]
if int(sdata[1]) == b_idx:
b += sdata[2+(z_val^1)]
thresh += sdata[2+z_val]
inputs ^= 2
next_t = b
else:
a_cur += 1
a = state[a_mem + a_cur, st_idx]
a += line_times[a_idx, 0, z_val ^ 1] * rand_gauss_dev(_seed ^ a_mem ^ z_val ^ 1, sd) * sdata[0]
thresh = line_times[a_idx, 1, z_val] * rand_gauss_dev(_seed ^ a_mem ^ z_val, sd) * sdata[0]
if int(sdata[1]) == a_idx:
a += sdata[2+(z_val^1)]
thresh += sdata[2+z_val]
inputs ^= 1
next_t = a
if (z_cur & 1) != ((lut >> inputs) & 1):
# we generate a toggle in z_mem, if:
# ( it is the first toggle in z_mem OR
# following toggle is earlier OR
# pulse is wide enough ) AND enough space in z_mem.
if z_cur == 0 or next_t < current_t or (current_t - previous_t) > thresh:
if z_cur < (z_cap - 1):
state[z_mem + z_cur, st_idx] = current_t
previous_t = current_t
z_cur += 1
else:
overflows += 1
previous_t = state[z_mem + z_cur - 1, st_idx]
z_cur -= 1
else:
z_cur -= 1
if z_cur > 0:
previous_t = state[z_mem + z_cur - 1, st_idx]
else:
previous_t = TMIN
current_t = min(a, b)
if overflows > 0:
state[z_mem + z_cur, st_idx] = TMAX_OVL
else:
state[z_mem + z_cur, st_idx] = a if a > b else b # propagate overflow flags by storing biggest TMAX from input

248
tests/test_wave_sim.py

@ -1,118 +1,148 @@
import numpy as np import numpy as np
from kyupy.wave_sim import WaveSim, WaveSimCuda, wave_eval, TMIN, TMAX from kyupy.wave_sim import WaveSim, WaveSimCuda, wave_eval_cpu, TMIN, TMAX
from kyupy.logic_sim import LogicSim from kyupy.logic_sim import LogicSim
from kyupy import verilog, sdf, logic from kyupy import verilog, sdf, logic, bench
from kyupy.logic import MVArray, BPArray from kyupy.logic import MVArray, BPArray
from kyupy.sim import SimPrim
def test_wave_eval(): def test_nand_delays():
op = (SimPrim.NAND4, 4, 0, 1, 2, 3)
#op = (0b0111, 4, 0, 1)
c = np.full((5*16, 1), TMAX) # 5 waveforms of capacity 16
vat = np.zeros((5, 3), dtype='int')
for i in range(5): vat[i] = i*16, 16, 0 # 1:1 mapping
# SDF specifies IOPATH delays with respect to output polarity # SDF specifies IOPATH delays with respect to output polarity
# SDF pulse rejection value is determined by IOPATH causing last transition and polarity of last transition # SDF pulse rejection value is determined by IOPATH causing last transition and polarity of last transition
line_times = np.zeros((3, 2, 2)) line_times = np.zeros((5, 2, 2))
line_times[0, 0, 0] = 0.1 # A -> Z rise delay line_times[0, 0, 0] = 0.1 # A -> Z rise delay
line_times[0, 0, 1] = 0.2 # A -> Z fall delay line_times[0, 0, 1] = 0.2 # A -> Z fall delay
line_times[0, 1, 0] = 0.1 # A -> Z negative pulse limit (terminate in rising Z) line_times[0, 1, 0] = 0.1 # A -> Z negative pulse limit (terminate in rising Z)
line_times[0, 1, 1] = 0.2 # A -> Z positive pulse limit line_times[0, 1, 1] = 0.2 # A -> Z positive pulse limit
line_times[1, 0, 0] = 0.3 # as above for B -> Z line_times[1, :, 0] = 0.3 # as above for B -> Z
line_times[1, 0, 1] = 0.4 line_times[1, :, 1] = 0.4
line_times[1, 1, 0] = 0.3 line_times[2, :, 0] = 0.5 # as above for C -> Z
line_times[1, 1, 1] = 0.4 line_times[2, :, 1] = 0.6
line_times[3, :, 0] = 0.7 # as above for D -> Z
state = np.zeros((3*16, 1)) + TMAX # 3 waveforms of capacity 16 line_times[3, :, 1] = 0.8
state[::16, 0] = 16 # first entry is capacity
a = state[0:16, 0]
b = state[16:32, 0]
z = state[32:, 0]
sat = np.zeros((3, 3), dtype='int')
sat[0] = 0, 16, 0
sat[1] = 16, 16, 0
sat[2] = 32, 16, 0
sdata = np.asarray([1, -1, 0, 0], dtype='float32') sdata = np.asarray([1, -1, 0, 0], dtype='float32')
wave_eval((0b0111, 2, 0, 1), state, sat, 0, line_times, sdata) def wave_assert(inputs, output):
assert z[0] == TMIN for i, a in zip(inputs, c.reshape(-1,16)): a[:len(i)] = i
wave_eval_cpu(op, c, vat, 0, line_times, sdata)
a[0] = TMIN for i, v in enumerate(output): np.testing.assert_allclose(c.reshape(-1,16)[4,i], v)
wave_eval((0b0111, 2, 0, 1), state, sat, 0, line_times, sdata)
assert z[0] == TMIN wave_assert([[TMAX,TMAX],[TMAX,TMAX],[TMIN,TMAX],[TMIN,TMAX]], [TMIN,TMAX]) # NAND(0,0,1,1) => 1
wave_assert([[TMIN,TMAX],[TMAX,TMAX],[TMIN,TMAX],[TMIN,TMAX]], [TMIN,TMAX]) # NAND(1,0,1,1) => 1
b[0] = TMIN wave_assert([[TMIN,TMAX],[TMIN,TMAX],[TMIN,TMAX],[TMIN,TMAX]], [TMAX]) # NAND(1,1,1,1) => 0
wave_eval((0b0111, 2, 0, 1), state, sat, 0, line_times, sdata)
assert z[0] == TMAX # Keep inputs C=1 and D=1.
wave_assert([[1,TMAX],[2,TMAX]], [TMIN,2.4,TMAX]) # _/⎺⎺⎺ NAND __/⎺⎺ => ⎺⎺⎺\___ (B->Z fall delay)
a[0] = 1 # A _/^^^ wave_assert([[TMIN,TMAX],[TMIN,2,TMAX]], [2.3,TMAX]) # ⎺⎺⎺⎺⎺ NAND ⎺⎺\__ => ___/⎺⎺⎺ (B->Z rise delay)
b[0] = 2 # B __/^^ wave_assert([[TMIN,TMAX],[TMIN,2,2.35,TMAX]], [2.3,2.75,TMAX]) # ⎺⎺⎺⎺⎺ NAND ⎺\_/⎺ => __/⎺⎺\_ (pos pulse, .35@B -> .45@Z)
wave_eval((0b0111, 2, 0, 1), state, sat, 0, line_times, sdata) wave_assert([[TMIN,TMAX],[TMIN,2,2.25,TMAX]], [TMAX]) # ⎺⎺⎺⎺⎺ NAND ⎺\_/⎺ => _______ (pos pulse, .25@B -> .35@Z, filtered)
assert z[0] == TMIN # ^^^\___ B -> Z fall delay wave_assert([[TMIN,TMAX],[2,2.45,TMAX]], [TMIN,2.4,2.75,TMAX]) # ⎺⎺⎺⎺⎺ NAND _/⎺\_ => ⎺⎺\_/⎺⎺ (neg pulse, .45@B -> .35@Z)
assert z[1] == 2.4 wave_assert([[TMIN,TMAX],[2,2.35,TMAX]], [TMIN,TMAX]) # ⎺⎺⎺⎺⎺ NAND _/⎺\_ => ⎺⎺⎺⎺⎺⎺⎺ (neg pulse, .35@B -> .25@Z, filtered)
assert z[2] == TMAX
a[0] = TMIN # A ^^^^^^ def test_tiny_circuit():
b[0] = TMIN # B ^^^\__ c = bench.parse('input(x, y) output(a, o, n) a=and(x,y) o=or(x,y) n=not(x)')
b[1] = 2 lt = np.zeros((len(c.lines), 2, 2))
wave_eval((0b0111, 2, 0, 1), state, sat, 0, line_times, sdata) lt[:,0,:] = 1.0 # unit delay for all lines
assert z[0] == 2.3 # ___/^^^ B -> Z rise delay wsim = WaveSim(c, lt)
assert z[1] == TMAX assert len(wsim.s) == 5
# pos pulse of 0.35 at B -> 0.45 after delays # values for x
a[0] = TMIN # A ^^^^^^^^ wsim.s[0,0,:3] = 0, 0.1, 0
b[0] = TMIN wsim.s[0,1,:3] = 0, 0.2, 1
b[1] = 2 # B ^^\__/^^ wsim.s[0,2,:3] = 1, 0.3, 0
b[2] = 2.35 wsim.s[0,3,:3] = 1, 0.4, 1
wave_eval((0b0111, 2, 0, 1), state, sat, 0, line_times, sdata)
assert z[0] == 2.3 # __/^^\__ # values for y
assert z[1] == 2.75 wsim.s[1,0,:3] = 1, 0.5, 0
assert z[2] == TMAX wsim.s[1,1,:3] = 1, 0.6, 0
wsim.s[1,2,:3] = 1, 0.7, 0
# neg pulse of 0.45 at B -> 0.35 after delays wsim.s[1,3,:3] = 0, 0.8, 1
a[0] = TMIN # A ^^^^^^^^
b[0] = 2 # B __/^^\__ wsim.s_to_c()
b[1] = 2.45
b[2] = TMAX x_c_loc = wsim.vat[wsim.ppi_offset+0, 0] # check x waveforms
wave_eval((0b0111, 2, 0, 1), state, sat, 0, line_times, sdata) np.testing.assert_allclose(wsim.c[x_c_loc:x_c_loc+3, 0], [TMAX, TMAX, TMAX])
assert z[0] == TMIN # ^^\__/^^ np.testing.assert_allclose(wsim.c[x_c_loc:x_c_loc+3, 1], [0.2, TMAX, TMAX])
assert z[1] == 2.4 np.testing.assert_allclose(wsim.c[x_c_loc:x_c_loc+3, 2], [TMIN, 0.3, TMAX])
assert z[2] == 2.75 np.testing.assert_allclose(wsim.c[x_c_loc:x_c_loc+3, 3], [TMIN, TMAX, TMAX])
assert z[3] == TMAX
y_c_loc = wsim.vat[wsim.ppi_offset+1, 0] # check y waveforms
# neg pulse of 0.35 at B -> 0.25 after delays (filtered) np.testing.assert_allclose(wsim.c[y_c_loc:y_c_loc+3, 0], [TMIN, 0.5, TMAX])
a[0] = TMIN # A ^^^^^^^^ np.testing.assert_allclose(wsim.c[y_c_loc:y_c_loc+3, 1], [TMIN, 0.6, TMAX])
b[0] = 2 # B __/^^\__ np.testing.assert_allclose(wsim.c[y_c_loc:y_c_loc+3, 2], [TMIN, 0.7, TMAX])
b[1] = 2.35 np.testing.assert_allclose(wsim.c[y_c_loc:y_c_loc+3, 3], [0.8, TMAX, TMAX])
b[2] = TMAX
wave_eval((0b0111, 2, 0, 1), state, sat, 0, line_times, sdata) wsim.c_prop()
assert z[0] == TMIN # ^^^^^^
assert z[1] == TMAX a_c_loc = wsim.vat[wsim.ppo_offset+2, 0] # check a waveforms
np.testing.assert_allclose(wsim.c[a_c_loc:a_c_loc+3, 0], [TMAX, TMAX, TMAX])
# pos pulse of 0.25 at B -> 0.35 after delays (filtered) np.testing.assert_allclose(wsim.c[a_c_loc:a_c_loc+3, 1], [1.2, 1.6, TMAX])
a[0] = TMIN # A ^^^^^^^^ np.testing.assert_allclose(wsim.c[a_c_loc:a_c_loc+3, 2], [TMIN, 1.3, TMAX])
b[0] = TMIN np.testing.assert_allclose(wsim.c[a_c_loc:a_c_loc+3, 3], [1.8, TMAX, TMAX])
b[1] = 2 # B ^^\__/^^
b[2] = 2.25 o_c_loc = wsim.vat[wsim.ppo_offset+3, 0] # check o waveforms
wave_eval((0b0111, 2, 0, 1), state, sat, 0, line_times, sdata) np.testing.assert_allclose(wsim.c[o_c_loc:o_c_loc+3, 0], [TMIN, 1.5, TMAX])
assert z[0] == TMAX # ______ np.testing.assert_allclose(wsim.c[o_c_loc:o_c_loc+3, 1], [TMIN, TMAX, TMAX])
np.testing.assert_allclose(wsim.c[o_c_loc:o_c_loc+3, 2], [TMIN, 1.7, TMAX])
np.testing.assert_allclose(wsim.c[o_c_loc:o_c_loc+3, 3], [TMIN, TMAX, TMAX])
def compare_to_logic_sim(wsim):
tests = MVArray((len(wsim.interface), wsim.sims)) n_c_loc = wsim.vat[wsim.ppo_offset+4, 0] # check n waveforms
np.testing.assert_allclose(wsim.c[n_c_loc:n_c_loc+3, 0], [TMIN, TMAX, TMAX])
np.testing.assert_allclose(wsim.c[n_c_loc:n_c_loc+3, 1], [TMIN, 1.2, TMAX])
np.testing.assert_allclose(wsim.c[n_c_loc:n_c_loc+3, 2], [1.3, TMAX, TMAX])
np.testing.assert_allclose(wsim.c[n_c_loc:n_c_loc+3, 3], [TMAX, TMAX, TMAX])
wsim.c_to_s()
# check a captures
np.testing.assert_allclose(wsim.s[2, 0, 3:7], [0, TMAX, TMIN, 0])
np.testing.assert_allclose(wsim.s[2, 1, 3:7], [0, 1.2, 1.6, 0])
np.testing.assert_allclose(wsim.s[2, 2, 3:7], [1, 1.3, 1.3, 0])
np.testing.assert_allclose(wsim.s[2, 3, 3:7], [0, 1.8, 1.8, 1])
# check o captures
np.testing.assert_allclose(wsim.s[3, 0, 3:7], [1, 1.5, 1.5, 0])
np.testing.assert_allclose(wsim.s[3, 1, 3:7], [1, TMAX, TMIN, 1])
np.testing.assert_allclose(wsim.s[3, 2, 3:7], [1, 1.7, 1.7, 0])
np.testing.assert_allclose(wsim.s[3, 3, 3:7], [1, TMAX, TMIN, 1])
# check o captures
np.testing.assert_allclose(wsim.s[4, 0, 3:7], [1, TMAX, TMIN, 1])
np.testing.assert_allclose(wsim.s[4, 1, 3:7], [1, 1.2, 1.2, 0])
np.testing.assert_allclose(wsim.s[4, 2, 3:7], [0, 1.3, 1.3, 1])
np.testing.assert_allclose(wsim.s[4, 3, 3:7], [0, TMAX, TMIN, 0])
def compare_to_logic_sim(wsim: WaveSim):
tests = MVArray((len(wsim.s_nodes), wsim.sims))
choices = np.asarray([logic.ZERO, logic.ONE, logic.RISE, logic.FALL], dtype=np.uint8) choices = np.asarray([logic.ZERO, logic.ONE, logic.RISE, logic.FALL], dtype=np.uint8)
rng = np.random.default_rng(10) rng = np.random.default_rng(10)
tests.data[...] = rng.choice(choices, tests.data.shape) tests.data[...] = rng.choice(choices, tests.data.shape)
tests_bp = BPArray(tests)
wsim.assign(tests_bp)
wsim.propagate()
cdata = wsim.capture()
resp = MVArray(tests) wsim.s[:, :, 0] = (tests.data & 2) >> 1
wsim.s[:, :, 3] = (tests.data & 2) >> 1
wsim.s[:, :, 1] = 0.0
wsim.s[:, :, 2] = tests.data & 1
wsim.s[:, :, 6] = tests.data & 1
wsim.s_to_c()
wsim.c_prop()
wsim.c_to_s()
for iidx, inode in enumerate(wsim.interface): resp = MVArray(tests)
if len(inode.ins) > 0: resp.data[...] = np.array(wsim.s[:, :, 6], dtype=np.uint8) | (np.array(wsim.s[:, :, 3], dtype=np.uint8)<<1)
for vidx in range(wsim.sims): resp.data |= ((resp.data ^ (resp.data >> 1)) & 1) << 2 # transitions
resp.data[iidx, vidx] = logic.ZERO if cdata[iidx, vidx, 0] < 0.5 else logic.ONE
# resp.set_value(vidx, iidx, 0 if cdata[iidx, vidx, 0] < 0.5 else 1)
tests_bp = BPArray(tests)
lsim = LogicSim(wsim.circuit, len(tests_bp)) lsim = LogicSim(wsim.circuit, len(tests_bp))
lsim.assign(tests_bp) lsim.assign(tests_bp)
lsim.propagate() lsim.propagate()
@ -121,30 +151,18 @@ def compare_to_logic_sim(wsim):
exp = MVArray(exp_bp) exp = MVArray(exp_bp)
for i in range(8): for i in range(8):
exp_str = exp[i].replace('R', '1').replace('F', '0').replace('P', '0').replace('N', '1') exp_str = exp[i].replace('P', '0').replace('N', '1')
res_str = resp[i].replace('R', '1').replace('F', '0').replace('P', '0').replace('N', '1') res_str = resp[i].replace('P', '0').replace('N', '1')
assert res_str == exp_str assert res_str == exp_str
def test_b14(mydir): def test_b14(b14_circuit, b14_timing):
c = verilog.load(mydir / 'b14.v.gz', branchforks=True) compare_to_logic_sim(WaveSim(b14_circuit, b14_timing, 8))
df = sdf.load(mydir / 'b14.sdf.gz')
lt = df.annotation(c)
wsim = WaveSim(c, lt, 8)
compare_to_logic_sim(wsim)
def test_b14_strip_forks(mydir): def test_b14_strip_forks(b14_circuit, b14_timing):
c = verilog.load(mydir / 'b14.v.gz', branchforks=True) compare_to_logic_sim(WaveSim(b14_circuit, b14_timing, 8, strip_forks=True))
df = sdf.load(mydir / 'b14.sdf.gz')
lt = df.annotation(c)
wsim = WaveSim(c, lt, 8, strip_forks=True)
compare_to_logic_sim(wsim)
def test_b14_cuda(mydir): def test_b14_cuda(b14_circuit, b14_timing):
c = verilog.load(mydir / 'b14.v.gz', branchforks=True) compare_to_logic_sim(WaveSimCuda(b14_circuit, b14_timing, 8, strip_forks=True))
df = sdf.load(mydir / 'b14.sdf.gz')
lt = df.annotation(c)
wsim = WaveSimCuda(c, lt, 8)
compare_to_logic_sim(wsim)

166
tests/test_wave_sim4.py

@ -1,166 +0,0 @@
import numpy as np
from kyupy.wave_sim4 import WaveSim, WaveSimCuda, wave_eval_cpu, TMIN, TMAX
from kyupy.logic_sim import LogicSim
from kyupy import verilog, sdf, logic, bench
from kyupy.logic import MVArray, BPArray
from kyupy.sim import SimPrim
def test_nand_delays():
op = (SimPrim.NAND4, 4, 0, 1, 2, 3)
#op = (0b0111, 4, 0, 1)
c = np.full((5*16, 1), TMAX) # 5 waveforms of capacity 16
vat = np.zeros((5, 3), dtype='int')
for i in range(5): vat[i] = i*16, 16, 0 # 1:1 mapping
# SDF specifies IOPATH delays with respect to output polarity
# SDF pulse rejection value is determined by IOPATH causing last transition and polarity of last transition
line_times = np.zeros((5, 2, 2))
line_times[0, 0, 0] = 0.1 # A -> Z rise delay
line_times[0, 0, 1] = 0.2 # A -> Z fall delay
line_times[0, 1, 0] = 0.1 # A -> Z negative pulse limit (terminate in rising Z)
line_times[0, 1, 1] = 0.2 # A -> Z positive pulse limit
line_times[1, :, 0] = 0.3 # as above for B -> Z
line_times[1, :, 1] = 0.4
line_times[2, :, 0] = 0.5 # as above for C -> Z
line_times[2, :, 1] = 0.6
line_times[3, :, 0] = 0.7 # as above for D -> Z
line_times[3, :, 1] = 0.8
sdata = np.asarray([1, -1, 0, 0], dtype='float32')
def wave_assert(inputs, output):
for i, a in zip(inputs, c.reshape(-1,16)): a[:len(i)] = i
wave_eval_cpu(op, c, vat, 0, line_times, sdata)
for i, v in enumerate(output): np.testing.assert_allclose(c.reshape(-1,16)[4,i], v)
wave_assert([[TMAX,TMAX],[TMAX,TMAX],[TMIN,TMAX],[TMIN,TMAX]], [TMIN,TMAX]) # NAND(0,0,1,1) => 1
wave_assert([[TMIN,TMAX],[TMAX,TMAX],[TMIN,TMAX],[TMIN,TMAX]], [TMIN,TMAX]) # NAND(1,0,1,1) => 1
wave_assert([[TMIN,TMAX],[TMIN,TMAX],[TMIN,TMAX],[TMIN,TMAX]], [TMAX]) # NAND(1,1,1,1) => 0
# Keep inputs C=1 and D=1.
wave_assert([[1,TMAX],[2,TMAX]], [TMIN,2.4,TMAX]) # _/⎺⎺⎺ NAND __/⎺⎺ => ⎺⎺⎺\___ (B->Z fall delay)
wave_assert([[TMIN,TMAX],[TMIN,2,TMAX]], [2.3,TMAX]) # ⎺⎺⎺⎺⎺ NAND ⎺⎺\__ => ___/⎺⎺⎺ (B->Z rise delay)
wave_assert([[TMIN,TMAX],[TMIN,2,2.35,TMAX]], [2.3,2.75,TMAX]) # ⎺⎺⎺⎺⎺ NAND ⎺\_/⎺ => __/⎺⎺\_ (pos pulse, .35@B -> .45@Z)
wave_assert([[TMIN,TMAX],[TMIN,2,2.25,TMAX]], [TMAX]) # ⎺⎺⎺⎺⎺ NAND ⎺\_/⎺ => _______ (pos pulse, .25@B -> .35@Z, filtered)
wave_assert([[TMIN,TMAX],[2,2.45,TMAX]], [TMIN,2.4,2.75,TMAX]) # ⎺⎺⎺⎺⎺ NAND _/⎺\_ => ⎺⎺\_/⎺⎺ (neg pulse, .45@B -> .35@Z)
wave_assert([[TMIN,TMAX],[2,2.35,TMAX]], [TMIN,TMAX]) # ⎺⎺⎺⎺⎺ NAND _/⎺\_ => ⎺⎺⎺⎺⎺⎺⎺ (neg pulse, .35@B -> .25@Z, filtered)
def test_tiny_circuit():
c = bench.parse('input(x, y) output(a, o, n) a=and(x,y) o=or(x,y) n=not(x)')
lt = np.zeros((len(c.lines), 2, 2))
lt[:,0,:] = 1.0 # unit delay for all lines
wsim = WaveSim(c, lt)
assert len(wsim.s) == 5
# values for x
wsim.s[0,0,:3] = 0, 0.1, 0
wsim.s[0,1,:3] = 0, 0.2, 1
wsim.s[0,2,:3] = 1, 0.3, 0
wsim.s[0,3,:3] = 1, 0.4, 1
# values for y
wsim.s[1,0,:3] = 1, 0.5, 0
wsim.s[1,1,:3] = 1, 0.6, 0
wsim.s[1,2,:3] = 1, 0.7, 0
wsim.s[1,3,:3] = 0, 0.8, 1
wsim.s_to_c()
x_c_loc = wsim.vat[wsim.ppi_offset+0, 0] # check x waveforms
np.testing.assert_allclose(wsim.c[x_c_loc:x_c_loc+3, 0], [TMAX, TMAX, TMAX])
np.testing.assert_allclose(wsim.c[x_c_loc:x_c_loc+3, 1], [0.2, TMAX, TMAX])
np.testing.assert_allclose(wsim.c[x_c_loc:x_c_loc+3, 2], [TMIN, 0.3, TMAX])
np.testing.assert_allclose(wsim.c[x_c_loc:x_c_loc+3, 3], [TMIN, TMAX, TMAX])
y_c_loc = wsim.vat[wsim.ppi_offset+1, 0] # check y waveforms
np.testing.assert_allclose(wsim.c[y_c_loc:y_c_loc+3, 0], [TMIN, 0.5, TMAX])
np.testing.assert_allclose(wsim.c[y_c_loc:y_c_loc+3, 1], [TMIN, 0.6, TMAX])
np.testing.assert_allclose(wsim.c[y_c_loc:y_c_loc+3, 2], [TMIN, 0.7, TMAX])
np.testing.assert_allclose(wsim.c[y_c_loc:y_c_loc+3, 3], [0.8, TMAX, TMAX])
wsim.c_prop()
a_c_loc = wsim.vat[wsim.ppo_offset+2, 0] # check a waveforms
np.testing.assert_allclose(wsim.c[a_c_loc:a_c_loc+3, 0], [TMAX, TMAX, TMAX])
np.testing.assert_allclose(wsim.c[a_c_loc:a_c_loc+3, 1], [1.2, 1.6, TMAX])
np.testing.assert_allclose(wsim.c[a_c_loc:a_c_loc+3, 2], [TMIN, 1.3, TMAX])
np.testing.assert_allclose(wsim.c[a_c_loc:a_c_loc+3, 3], [1.8, TMAX, TMAX])
o_c_loc = wsim.vat[wsim.ppo_offset+3, 0] # check o waveforms
np.testing.assert_allclose(wsim.c[o_c_loc:o_c_loc+3, 0], [TMIN, 1.5, TMAX])
np.testing.assert_allclose(wsim.c[o_c_loc:o_c_loc+3, 1], [TMIN, TMAX, TMAX])
np.testing.assert_allclose(wsim.c[o_c_loc:o_c_loc+3, 2], [TMIN, 1.7, TMAX])
np.testing.assert_allclose(wsim.c[o_c_loc:o_c_loc+3, 3], [TMIN, TMAX, TMAX])
n_c_loc = wsim.vat[wsim.ppo_offset+4, 0] # check n waveforms
np.testing.assert_allclose(wsim.c[n_c_loc:n_c_loc+3, 0], [TMIN, TMAX, TMAX])
np.testing.assert_allclose(wsim.c[n_c_loc:n_c_loc+3, 1], [TMIN, 1.2, TMAX])
np.testing.assert_allclose(wsim.c[n_c_loc:n_c_loc+3, 2], [1.3, TMAX, TMAX])
np.testing.assert_allclose(wsim.c[n_c_loc:n_c_loc+3, 3], [TMAX, TMAX, TMAX])
wsim.c_to_s()
# check a captures
np.testing.assert_allclose(wsim.s[2, 0, 3:7], [0, TMAX, TMIN, 0])
np.testing.assert_allclose(wsim.s[2, 1, 3:7], [0, 1.2, 1.6, 0])
np.testing.assert_allclose(wsim.s[2, 2, 3:7], [1, 1.3, 1.3, 0])
np.testing.assert_allclose(wsim.s[2, 3, 3:7], [0, 1.8, 1.8, 1])
# check o captures
np.testing.assert_allclose(wsim.s[3, 0, 3:7], [1, 1.5, 1.5, 0])
np.testing.assert_allclose(wsim.s[3, 1, 3:7], [1, TMAX, TMIN, 1])
np.testing.assert_allclose(wsim.s[3, 2, 3:7], [1, 1.7, 1.7, 0])
np.testing.assert_allclose(wsim.s[3, 3, 3:7], [1, TMAX, TMIN, 1])
# check o captures
np.testing.assert_allclose(wsim.s[4, 0, 3:7], [1, TMAX, TMIN, 1])
np.testing.assert_allclose(wsim.s[4, 1, 3:7], [1, 1.2, 1.2, 0])
np.testing.assert_allclose(wsim.s[4, 2, 3:7], [0, 1.3, 1.3, 1])
np.testing.assert_allclose(wsim.s[4, 3, 3:7], [0, TMAX, TMIN, 0])
def compare_to_logic_sim(wsim: WaveSim):
tests = MVArray((len(wsim.s_nodes), wsim.sims))
choices = np.asarray([logic.ZERO, logic.ONE, logic.RISE, logic.FALL], dtype=np.uint8)
rng = np.random.default_rng(10)
tests.data[...] = rng.choice(choices, tests.data.shape)
wsim.s[..., 0] = (tests.data & 2) >> 1
wsim.s[..., 3] = (tests.data & 2) >> 1
wsim.s[..., 1] = 0.0
wsim.s[..., 2] = tests.data & 1
wsim.s[..., 6] = tests.data & 1
wsim.s_to_c()
wsim.c_prop()
wsim.c_to_s()
resp = MVArray(tests)
resp.data[...] = wsim.s[..., 6].astype(np.uint8) | (wsim.s[..., 3].astype(np.uint8)<<1)
resp.data |= ((resp.data ^ (resp.data >> 1)) & 1) << 2 # transitions
tests_bp = BPArray(tests)
lsim = LogicSim(wsim.circuit, len(tests_bp))
lsim.assign(tests_bp)
lsim.propagate()
exp_bp = BPArray(tests_bp)
lsim.capture(exp_bp)
exp = MVArray(exp_bp)
for i in range(8):
exp_str = exp[i].replace('P', '0').replace('N', '1')
res_str = resp[i].replace('P', '0').replace('N', '1')
assert res_str == exp_str
def test_b14(b14_circuit, b14_timing):
compare_to_logic_sim(WaveSim(b14_circuit, b14_timing, 8))
def test_b14_strip_forks(b14_circuit, b14_timing):
compare_to_logic_sim(WaveSim(b14_circuit, b14_timing, 8, strip_forks=True))
def test_b14_cuda(b14_circuit, b14_timing):
compare_to_logic_sim(WaveSimCuda(b14_circuit, b14_timing, 8, strip_forks=True))

138
tests/test_wave_sim_old.py

@ -0,0 +1,138 @@
import numpy as np
from kyupy.wave_sim_old import WaveSim, WaveSimCuda, wave_eval, TMIN, TMAX
from kyupy.logic_sim import LogicSim
from kyupy import verilog, sdf, logic
from kyupy.logic import MVArray, BPArray
def test_wave_eval():
# SDF specifies IOPATH delays with respect to output polarity
# SDF pulse rejection value is determined by IOPATH causing last transition and polarity of last transition
line_times = np.zeros((3, 2, 2))
line_times[0, 0, 0] = 0.1 # A -> Z rise delay
line_times[0, 0, 1] = 0.2 # A -> Z fall delay
line_times[0, 1, 0] = 0.1 # A -> Z negative pulse limit (terminate in rising Z)
line_times[0, 1, 1] = 0.2 # A -> Z positive pulse limit
line_times[1, 0, 0] = 0.3 # as above for B -> Z
line_times[1, 0, 1] = 0.4
line_times[1, 1, 0] = 0.3
line_times[1, 1, 1] = 0.4
state = np.zeros((3*16, 1)) + TMAX # 3 waveforms of capacity 16
state[::16, 0] = 16 # first entry is capacity
a = state[0:16, 0]
b = state[16:32, 0]
z = state[32:, 0]
sat = np.zeros((3, 3), dtype='int')
sat[0] = 0, 16, 0
sat[1] = 16, 16, 0
sat[2] = 32, 16, 0
sdata = np.asarray([1, -1, 0, 0], dtype='float32')
wave_eval((0b0111, 2, 0, 1), state, sat, 0, line_times, sdata)
assert z[0] == TMIN
a[0] = TMIN
wave_eval((0b0111, 2, 0, 1), state, sat, 0, line_times, sdata)
assert z[0] == TMIN
b[0] = TMIN
wave_eval((0b0111, 2, 0, 1), state, sat, 0, line_times, sdata)
assert z[0] == TMAX
a[0] = 1 # A _/^^^
b[0] = 2 # B __/^^
wave_eval((0b0111, 2, 0, 1), state, sat, 0, line_times, sdata)
assert z[0] == TMIN # ^^^\___ B -> Z fall delay
assert z[1] == 2.4
assert z[2] == TMAX
a[0] = TMIN # A ^^^^^^
b[0] = TMIN # B ^^^\__
b[1] = 2
wave_eval((0b0111, 2, 0, 1), state, sat, 0, line_times, sdata)
assert z[0] == 2.3 # ___/^^^ B -> Z rise delay
assert z[1] == TMAX
# pos pulse of 0.35 at B -> 0.45 after delays
a[0] = TMIN # A ^^^^^^^^
b[0] = TMIN
b[1] = 2 # B ^^\__/^^
b[2] = 2.35
wave_eval((0b0111, 2, 0, 1), state, sat, 0, line_times, sdata)
assert z[0] == 2.3 # __/^^\__
assert z[1] == 2.75
assert z[2] == TMAX
# neg pulse of 0.45 at B -> 0.35 after delays
a[0] = TMIN # A ^^^^^^^^
b[0] = 2 # B __/^^\__
b[1] = 2.45
b[2] = TMAX
wave_eval((0b0111, 2, 0, 1), state, sat, 0, line_times, sdata)
assert z[0] == TMIN # ^^\__/^^
assert z[1] == 2.4
assert z[2] == 2.75
assert z[3] == TMAX
# neg pulse of 0.35 at B -> 0.25 after delays (filtered)
a[0] = TMIN # A ^^^^^^^^
b[0] = 2 # B __/^^\__
b[1] = 2.35
b[2] = TMAX
wave_eval((0b0111, 2, 0, 1), state, sat, 0, line_times, sdata)
assert z[0] == TMIN # ^^^^^^
assert z[1] == TMAX
# pos pulse of 0.25 at B -> 0.35 after delays (filtered)
a[0] = TMIN # A ^^^^^^^^
b[0] = TMIN
b[1] = 2 # B ^^\__/^^
b[2] = 2.25
wave_eval((0b0111, 2, 0, 1), state, sat, 0, line_times, sdata)
assert z[0] == TMAX # ______
def compare_to_logic_sim(wsim):
tests = MVArray((len(wsim.interface), wsim.sims))
choices = np.asarray([logic.ZERO, logic.ONE, logic.RISE, logic.FALL], dtype=np.uint8)
rng = np.random.default_rng(10)
tests.data[...] = rng.choice(choices, tests.data.shape)
tests_bp = BPArray(tests)
wsim.assign(tests_bp)
wsim.propagate()
cdata = wsim.capture()
resp = MVArray(tests)
for iidx, inode in enumerate(wsim.interface):
if len(inode.ins) > 0:
for vidx in range(wsim.sims):
resp.data[iidx, vidx] = logic.ZERO if cdata[iidx, vidx, 0] < 0.5 else logic.ONE
# resp.set_value(vidx, iidx, 0 if cdata[iidx, vidx, 0] < 0.5 else 1)
lsim = LogicSim(wsim.circuit, len(tests_bp))
lsim.assign(tests_bp)
lsim.propagate()
exp_bp = BPArray(tests_bp)
lsim.capture(exp_bp)
exp = MVArray(exp_bp)
for i in range(8):
exp_str = exp[i].replace('R', '1').replace('F', '0').replace('P', '0').replace('N', '1')
res_str = resp[i].replace('R', '1').replace('F', '0').replace('P', '0').replace('N', '1')
assert res_str == exp_str
def test_b14(b14_circuit, b14_timing):
compare_to_logic_sim(WaveSim(b14_circuit, b14_timing, 8))
def test_b14_strip_forks(b14_circuit, b14_timing):
compare_to_logic_sim(WaveSim(b14_circuit, b14_timing, 8, strip_forks=True))
def test_b14_cuda(b14_circuit, b14_timing):
compare_to_logic_sim(WaveSimCuda(b14_circuit, b14_timing, 8, strip_forks=True))
Loading…
Cancel
Save