1 changed files with 210 additions and 0 deletions
@ -0,0 +1,210 @@
@@ -0,0 +1,210 @@
|
||||
"""A simple parser for Verilog Change Dump (VCD) files. |
||||
|
||||
This parser loads the changes into a ndarray of logic values. |
||||
Axis 0 is the time step, axis 1 is the variable data. |
||||
All variable values are flattened. Offsets are stored in each variable metadata. |
||||
""" |
||||
|
||||
from collections import namedtuple |
||||
from dataclasses import dataclass |
||||
|
||||
from lark import Lark, Transformer |
||||
import numpy as np |
||||
|
||||
from . import readtext, logic |
||||
|
||||
|
||||
@dataclass |
||||
class Var: |
||||
type: str |
||||
width: int |
||||
idcode: str |
||||
reference: str |
||||
scope: 'Scope' |
||||
offset: int = 0 |
||||
|
||||
|
||||
class Scope: |
||||
def __init__(self, parent, name='', type='module') -> None: |
||||
self.name = name |
||||
self.type = type |
||||
self.parent_scope = parent |
||||
self.sub_scopes : list[Scope] = [] |
||||
self.vars : list = [] |
||||
|
||||
@property |
||||
def path(self) -> str: |
||||
parts = [] |
||||
s = self |
||||
while s is not None and s.name: |
||||
parts.append(s.name) |
||||
s = s.parent_scope |
||||
return '/'.join(reversed(parts)) |
||||
|
||||
|
||||
class VcdHeader: |
||||
def __init__(self) -> None: |
||||
self.comment: str = '' |
||||
self.date: str = '' |
||||
self.timescale: str = '' |
||||
self.version: str = '' |
||||
self.root_scope = Scope(None) |
||||
self.current_scope = self.root_scope |
||||
|
||||
def scope(self, name, type): |
||||
new_scope = Scope(self.current_scope, name, type) |
||||
self.current_scope.sub_scopes.append(new_scope) |
||||
self.current_scope = new_scope |
||||
|
||||
def upscope(self): |
||||
assert self.current_scope.parent_scope is not None, "root scope has no parent" |
||||
self.current_scope = self.current_scope.parent_scope |
||||
|
||||
def add_var(self, type: str, width: int, idcode: str, reference: str): |
||||
self.current_scope.vars.append(Var(type, width, idcode, reference, self.current_scope)) |
||||
|
||||
def __repr__(self): |
||||
lines = [ |
||||
'VcdHeader(', |
||||
f' comment = {self.comment!r}', |
||||
f' date = {self.date!r}', |
||||
f' timescale = {self.timescale!r}', |
||||
f' version = {self.version!r}', |
||||
' scopes:', |
||||
] |
||||
def fmt_scope(scope, indent=4): |
||||
lines.append(f'{" " * (indent // 2)}{scope.type}:{scope.name} ({len(scope.vars)} vars)') |
||||
for sub in scope.sub_scopes: |
||||
fmt_scope(sub, indent + 2) |
||||
for sub in self.root_scope.sub_scopes: |
||||
fmt_scope(sub) |
||||
lines.append(')') |
||||
return '\n'.join(lines) |
||||
|
||||
|
||||
class VcdVarMap: |
||||
def __init__(self, header: VcdHeader, filter = lambda var: True) -> None: |
||||
self.var_list: list[Var] = [] |
||||
def collect(scope): |
||||
self.var_list.extend(v for v in scope.vars if filter(v)) |
||||
for sub in scope.sub_scopes: |
||||
collect(sub) |
||||
collect(header.root_scope) |
||||
offset = 0 |
||||
for var in self.var_list: |
||||
var.offset = offset |
||||
offset += var.width |
||||
self.total_width = offset |
||||
self.idcode2var = {var.idcode: var for var in self.var_list} |
||||
|
||||
|
||||
class VcdData: |
||||
def __init__(self, var_map, steps, data): |
||||
self.var_map = var_map |
||||
self.steps = steps |
||||
self.data = data |
||||
|
||||
|
||||
class VcdHeaderTransformer(Transformer): |
||||
def __init__(self): |
||||
super().__init__() |
||||
self.header = VcdHeader() |
||||
|
||||
def comment(self, args): self.header.comment = args[0].value.strip() |
||||
def date(self, args): self.header.date = args[0].value.strip() |
||||
def timescale(self, args): self.header.timescale = args[0].value.strip() |
||||
def version(self, args): self.header.version = args[0].value.strip() |
||||
|
||||
def scope(self, args): |
||||
type, name = args |
||||
self.header.scope(name.value, type.value) |
||||
|
||||
def upscope(self, args): |
||||
self.header.upscope() |
||||
|
||||
def var(self, args): |
||||
type, width, idcode, reference = args |
||||
reference = reference.strip() |
||||
width = int(width) |
||||
self.header.add_var(type, width, idcode, reference) |
||||
|
||||
def start(self, args): |
||||
return self.header |
||||
#return tuple(args) if len(args) > 1 else (args[0], None) |
||||
|
||||
|
||||
GRAMMAR = r""" |
||||
start: (_declaration_command)* |
||||
_declaration_command: ( comment | date | scope | timescale | upscope | var | version ) "$end" |
||||
comment: "$comment" TEXT? |
||||
date: "$date" TEXT |
||||
scope: "$scope" /[^\s]+/ /[^\s]+/ |
||||
timescale: "$timescale" TEXT |
||||
upscope: "$upscope" |
||||
var: "$var" /[^\s]+/ /[^\s]+/ /[^\s]+/ TEXT |
||||
version: "$version" TEXT |
||||
TEXT: /(?:(?!\$end)[\s\S])+/ |
||||
%ignore ( /\r?\n/ )+ |
||||
%ignore /[\t \f]+/ |
||||
""" |
||||
|
||||
|
||||
def load(file, var_filter = lambda var: True, step_filter = lambda time, values, var_map: True): |
||||
"""Parses the contents of ``file`` as Verilog Change Dump (VCD). |
||||
|
||||
:param file: A file name or a file handle. Files with `.gz`-suffix are decompressed on-the-fly. |
||||
:param var_filter: A callback function to include only certain variables in the output. |
||||
:param step_filter: A callback function to include only certain steps in the output. |
||||
:return: A VcdData object with metadata and an ndarray with all values. |
||||
""" |
||||
vcd = readtext(file) |
||||
header_size = vcd.find('$enddefinitions') |
||||
assert header_size > 0, "invalid VCD file: end of header not found" |
||||
vcd_header_str = vcd[:header_size] |
||||
vcd_header : VcdHeader = Lark(GRAMMAR, parser="lalr", lexer='contextual', transformer=VcdHeaderTransformer()).parse(vcd_header_str) # type: ignore |
||||
vcd_data = vcd[header_size:].splitlines() |
||||
var_map = VcdVarMap(vcd_header, var_filter) |
||||
|
||||
chunk_size = 10240 |
||||
chunks = [] |
||||
chunk = np.full((chunk_size, var_map.total_width), logic.UNASSIGNED, dtype=np.uint8) |
||||
|
||||
_val_map = {'0': logic.ZERO, '1': logic.ONE, |
||||
'x': logic.UNKNOWN, 'X': logic.UNKNOWN, |
||||
'z': logic.UNASSIGNED, 'Z': logic.UNASSIGNED} |
||||
_pad_char = {'0': '0', '1': '0', 'x': 'x', 'X': 'x', 'z': 'z', 'Z': 'z'} |
||||
|
||||
current_time = None |
||||
steps = [] |
||||
step_idx = 0 |
||||
for line in vcd_data: |
||||
if not line or line[0] == '$': |
||||
continue |
||||
if line[0] == '#': |
||||
if step_filter(current_time, chunk[step_idx], var_map): |
||||
step_idx += 1 |
||||
steps.append(current_time) |
||||
if step_idx >= chunk_size: |
||||
chunks.append(chunk) |
||||
chunk = np.empty((chunk_size, var_map.total_width), dtype=np.uint8) |
||||
chunk[0] = chunks[-1][-1] |
||||
step_idx = 0 |
||||
else: |
||||
chunk[step_idx] = chunk[step_idx - 1] |
||||
current_time = int(line[1:]) |
||||
continue |
||||
if line[0] in ('b', 'B'): |
||||
value_str, idcode = line[1:].split(None, 1) |
||||
else: |
||||
value_str, idcode = line[0], line[1:] |
||||
var = var_map.idcode2var.get(idcode) |
||||
if var is None: |
||||
continue |
||||
if len(value_str) < var.width: |
||||
value_str = _pad_char.get(value_str[0], '0') * (var.width - len(value_str)) + value_str |
||||
for i, c in enumerate(value_str): |
||||
chunk[step_idx, var.offset + var.width - 1 - i] = _val_map.get(c, logic.UNKNOWN) |
||||
|
||||
chunks.append(chunk[:step_idx]) |
||||
data = np.concatenate(chunks, axis=0) |
||||
return VcdData(var_map, steps, data) |
||||
Loading…
Reference in new issue