From f975765bbba1e2a46db35559a7e908353e3403f8 Mon Sep 17 00:00:00 2001 From: stefan Date: Wed, 6 May 2026 23:34:48 +0900 Subject: [PATCH] first version of a vcd parser --- src/kyupy/vcd.py | 210 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 210 insertions(+) create mode 100644 src/kyupy/vcd.py diff --git a/src/kyupy/vcd.py b/src/kyupy/vcd.py new file mode 100644 index 0000000..0411101 --- /dev/null +++ b/src/kyupy/vcd.py @@ -0,0 +1,210 @@ +"""A simple parser for Verilog Change Dump (VCD) files. + +This parser loads the changes into a ndarray of logic values. +Axis 0 is the time step, axis 1 is the variable data. +All variable values are flattened. Offsets are stored in each variable metadata. +""" + +from collections import namedtuple +from dataclasses import dataclass + +from lark import Lark, Transformer +import numpy as np + +from . import readtext, logic + + +@dataclass +class Var: + type: str + width: int + idcode: str + reference: str + scope: 'Scope' + offset: int = 0 + + +class Scope: + def __init__(self, parent, name='', type='module') -> None: + self.name = name + self.type = type + self.parent_scope = parent + self.sub_scopes : list[Scope] = [] + self.vars : list = [] + + @property + def path(self) -> str: + parts = [] + s = self + while s is not None and s.name: + parts.append(s.name) + s = s.parent_scope + return '/'.join(reversed(parts)) + + +class VcdHeader: + def __init__(self) -> None: + self.comment: str = '' + self.date: str = '' + self.timescale: str = '' + self.version: str = '' + self.root_scope = Scope(None) + self.current_scope = self.root_scope + + def scope(self, name, type): + new_scope = Scope(self.current_scope, name, type) + self.current_scope.sub_scopes.append(new_scope) + self.current_scope = new_scope + + def upscope(self): + assert self.current_scope.parent_scope is not None, "root scope has no parent" + self.current_scope = self.current_scope.parent_scope + + def add_var(self, type: str, width: int, idcode: str, reference: str): + self.current_scope.vars.append(Var(type, width, idcode, reference, self.current_scope)) + + def __repr__(self): + lines = [ + 'VcdHeader(', + f' comment = {self.comment!r}', + f' date = {self.date!r}', + f' timescale = {self.timescale!r}', + f' version = {self.version!r}', + ' scopes:', + ] + def fmt_scope(scope, indent=4): + lines.append(f'{" " * (indent // 2)}{scope.type}:{scope.name} ({len(scope.vars)} vars)') + for sub in scope.sub_scopes: + fmt_scope(sub, indent + 2) + for sub in self.root_scope.sub_scopes: + fmt_scope(sub) + lines.append(')') + return '\n'.join(lines) + + +class VcdVarMap: + def __init__(self, header: VcdHeader, filter = lambda var: True) -> None: + self.var_list: list[Var] = [] + def collect(scope): + self.var_list.extend(v for v in scope.vars if filter(v)) + for sub in scope.sub_scopes: + collect(sub) + collect(header.root_scope) + offset = 0 + for var in self.var_list: + var.offset = offset + offset += var.width + self.total_width = offset + self.idcode2var = {var.idcode: var for var in self.var_list} + + +class VcdData: + def __init__(self, var_map, steps, data): + self.var_map = var_map + self.steps = steps + self.data = data + + +class VcdHeaderTransformer(Transformer): + def __init__(self): + super().__init__() + self.header = VcdHeader() + + def comment(self, args): self.header.comment = args[0].value.strip() + def date(self, args): self.header.date = args[0].value.strip() + def timescale(self, args): self.header.timescale = args[0].value.strip() + def version(self, args): self.header.version = args[0].value.strip() + + def scope(self, args): + type, name = args + self.header.scope(name.value, type.value) + + def upscope(self, args): + self.header.upscope() + + def var(self, args): + type, width, idcode, reference = args + reference = reference.strip() + width = int(width) + self.header.add_var(type, width, idcode, reference) + + def start(self, args): + return self.header + #return tuple(args) if len(args) > 1 else (args[0], None) + + +GRAMMAR = r""" + start: (_declaration_command)* + _declaration_command: ( comment | date | scope | timescale | upscope | var | version ) "$end" + comment: "$comment" TEXT? + date: "$date" TEXT + scope: "$scope" /[^\s]+/ /[^\s]+/ + timescale: "$timescale" TEXT + upscope: "$upscope" + var: "$var" /[^\s]+/ /[^\s]+/ /[^\s]+/ TEXT + version: "$version" TEXT + TEXT: /(?:(?!\$end)[\s\S])+/ + %ignore ( /\r?\n/ )+ + %ignore /[\t \f]+/ + """ + + +def load(file, var_filter = lambda var: True, step_filter = lambda time, values, var_map: True): + """Parses the contents of ``file`` as Verilog Change Dump (VCD). + + :param file: A file name or a file handle. Files with `.gz`-suffix are decompressed on-the-fly. + :param var_filter: A callback function to include only certain variables in the output. + :param step_filter: A callback function to include only certain steps in the output. + :return: A VcdData object with metadata and an ndarray with all values. + """ + vcd = readtext(file) + header_size = vcd.find('$enddefinitions') + assert header_size > 0, "invalid VCD file: end of header not found" + vcd_header_str = vcd[:header_size] + vcd_header : VcdHeader = Lark(GRAMMAR, parser="lalr", lexer='contextual', transformer=VcdHeaderTransformer()).parse(vcd_header_str) # type: ignore + vcd_data = vcd[header_size:].splitlines() + var_map = VcdVarMap(vcd_header, var_filter) + + chunk_size = 10240 + chunks = [] + chunk = np.full((chunk_size, var_map.total_width), logic.UNASSIGNED, dtype=np.uint8) + + _val_map = {'0': logic.ZERO, '1': logic.ONE, + 'x': logic.UNKNOWN, 'X': logic.UNKNOWN, + 'z': logic.UNASSIGNED, 'Z': logic.UNASSIGNED} + _pad_char = {'0': '0', '1': '0', 'x': 'x', 'X': 'x', 'z': 'z', 'Z': 'z'} + + current_time = None + steps = [] + step_idx = 0 + for line in vcd_data: + if not line or line[0] == '$': + continue + if line[0] == '#': + if step_filter(current_time, chunk[step_idx], var_map): + step_idx += 1 + steps.append(current_time) + if step_idx >= chunk_size: + chunks.append(chunk) + chunk = np.empty((chunk_size, var_map.total_width), dtype=np.uint8) + chunk[0] = chunks[-1][-1] + step_idx = 0 + else: + chunk[step_idx] = chunk[step_idx - 1] + current_time = int(line[1:]) + continue + if line[0] in ('b', 'B'): + value_str, idcode = line[1:].split(None, 1) + else: + value_str, idcode = line[0], line[1:] + var = var_map.idcode2var.get(idcode) + if var is None: + continue + if len(value_str) < var.width: + value_str = _pad_char.get(value_str[0], '0') * (var.width - len(value_str)) + value_str + for i, c in enumerate(value_str): + chunk[step_idx, var.offset + var.width - 1 - i] = _val_map.get(c, logic.UNKNOWN) + + chunks.append(chunk[:step_idx]) + data = np.concatenate(chunks, axis=0) + return VcdData(var_map, steps, data) \ No newline at end of file