# -*- coding: utf-8 -*-
#
# This file is part of the pyFDA project hosted at https://github.com/chipmuenk/pyfda
#
# Copyright © pyFDA Project Contributors
# Licensed under the terms of the MIT License
# (see file LICENSE in root directory for details)
"""
Create the tree dictionaries containing information about filters,
filter implementations, widgets etc. in hierarchical form
"""
import os, sys, re, ast
from collections import OrderedDict
from pprint import pformat
import importlib
import configparser
import pyfda.filterbroker as fb
import pyfda.filter_factory as ff
import pyfda.libs.pyfda_dirs as dirs
from .frozendict import freeze_hierarchical
import logging
logger = logging.getLogger(__name__)
# --------------------------------------------------------------------------
[docs]
def merge_dicts_hierarchically(d1, d2, path=None, mode='keep1'):
"""
Merge the hierarchical dictionaries ``d1`` and ``d2``. The dict ``d1`` is
modified in place and returned
Parameters
----------
d1 : dict
hierarchical dictionary 1
d2 : dict
hierarchical dictionary 2
mode : str
Select the behaviour when the same key is present in both dictionaries:
* :'keep1': keep the entry from ``d1`` (default)
* :'keep2': keep the entry from ``d2``
* :'add1': merge the entries, putting the values from ``d2`` first
(important for lists)
* :'add2': merge the entries, putting the values from ``d1`` first ( " )
path : str
internal parameter for keeping track of hierarchy during recursive calls,
it should not be set by the user
Returns
-------
d1 : dict
a reference to the first dictionary, merged-in-place.
Example
-------
>>> merge_dicts_hierarchically(fil_tree, fil_tree_add, mode='add1')
Notes
-----
If you don't want to modify ``d1`` in place, call the function using:
>>> new_dict = merge_dicts_hierarchically(dict(d1), d2)
If you need to merge more than two dicts use:
>>> from functools import reduce # only for py3
>>> reduce(merge, [d1, d2, d3...]) # add / merge all other dicts into d1
Taken with some modifications from:
http://stackoverflow.com/questions/7204805/dictionaries-of-dictionaries-merge
"""
if not(isinstance(d1, dict) and isinstance(d2, dict)):
# at least one of the arguments is not a dict -> don't do anything
return d1
if path is None:
path = ""
for key in d2:
if key in d1:
if isinstance(d1[key], dict) and isinstance(d2[key], dict):
# both entries are dicts, recurse one level deeper:
merge_dicts_hierarchically(d1[key], d2[key], path=path + str(key), mode=mode)
# TODO: elif <either d1[key] OR d2[key] is not a dict> -> exception
elif d1[key] == d2[key] or mode == 'keep1':
pass # keep item in dict1, discard item with same key in dict1
elif mode == 'keep2':
d1[key] = d2[key] # replace item in dict1 by item in dict2
else:
try:
if mode == 'add2':
if (isinstance(d1[key], tuple) and
isinstance(d2[key], tuple)):
d1[key] = (d2[key][0], d2[key][1] + d1[key][1])
else:
d1[key] = d2[key] + d1[key]
elif mode == 'add1':
if (isinstance(d1[key], tuple) and
isinstance(d2[key], tuple)):
d1[key] = (d1[key][0], d1[key][1] + d2[key][1])
else:
d1[key] = d1[key] + d2[key]
else:
logger.warning("Unknown merge mode {0}.".format(mode))
except Exception as e:
logger.warning(
f"Merge conflict at {path + str(key)}: {e}")
else:
d1[key] = d2[key] # add new entry to dict1
return d1
[docs]
class ParseError(Exception):
pass
[docs]
class Tree_Builder(object):
"""
Read the config file and construct dictionary trees with
- all filter combinations
- valid combinations of filter widgets and fixpoint implementations
"""
def __init__(self):
logger.debug("Config file: {0:s}\n".format(dirs.USER_CONF_DIR_FILE))
self.REQ_VERSION = 4 # required version for config file
self.parse_conf_file()
self.init_filters()
# --------------------------------------------------------------------------
[docs]
def init_filters(self):
"""
Run at startup to populate global dictionaries and lists:
- Read attributes (`ft`, `rt`, `fo`) from all valid filter classes (`fc`)
in the global dict ``fb.filter_classes`` and store them in the filter
tree dict ``fil_tree`` with the hierarchy
**rt-ft-fc-fo-subwidget:params** .
Parameters
----------
None
Returns
-------
None, but populates the following global attributes:
- `fb.fil_tree` :
"""
# self.parse_conf_file()
fil_tree = {}
for fc in fb.filter_classes: # iterate over all previously found filter
# classes fc
# instantiate a global instance ff.fil_inst() of filter class fc
err_code = ff.fil_factory.create_fil_inst(fc)
if err_code > 0:
logger.warning(
'Skipping filter class "{0:s}" due to import error {1:d}'
.format(fc, err_code))
continue # continue with next entry in fb.filter_classes
# add attributes from dict to fil_tree for filter class fc
fil_tree = self.build_fil_tree(fc, ff.fil_inst.rt_dict, fil_tree)
# merge additional rt_dict (optional) into filter tree
if hasattr(ff.fil_inst, 'rt_dict_add'):
fil_tree_add = self.build_fil_tree(fc, ff.fil_inst.rt_dict_add)
merge_dicts_hierarchically(fil_tree, fil_tree_add, mode='add1')
# Make the dictionary and all sub-dictionaries read-only ("FrozenDict"):
fb.fil_tree = freeze_hierarchical(fil_tree)
# Test Immutatbility
# fil_tree_ref = fb.fil_tree['LP']['FIR']['Equiripple']['min']
# fil_tree_ref.update({'msg':("hallo",)}) # this changes fb.fil_tree !!
# fb.fil_tree['LP']['FIR']['Equiripple']['min']['par'] = ("A_1","F_1")
# print(type(fb.fil_tree['LP']['FIR']['Equiripple']))
logger.debug("\nfb.fil_tree =\n%s", pformat(fb.fil_tree))
# --------------------------------------------------------------------------
[docs]
def parse_conf_file(self):
"""
Parse the configuration file `pyfda.conf` (specified in
``dirs.USER_CONF_DIR_FILE``). This is run only once at instantiation.
This is performed using :func:`build_class_dict()` which calls
:func:`parse_conf_section()`:
- Try to find and import the modules specified in the corresponding sections
- Extract and import the classes defined in each module and give back an
OrderedDict with the successfully imported classes and their options
(like fully qualified module names, display name, associated fixpoint
widgets etc.).
- Information for each section is stored in globally
accessible OrderdDicts like`fb.filter_classes`.
The following sections are analyzed:
:[Commons]:
Try to find user directories; if they exist add them to
`dirs.USER_DIRS` and `sys.path`
For the other sections, OrderedDicts are returned with the class names
as keys and dictionaries with options as values.
:[Input Widgets]:
Store (user) input widgets in `fb.input_classes`
:[Plot Widgets]:
Store (user) plot widgets in `fb.plot_classes`
:[Filter Widgets]:
Store (user) filter widgets in `fb.filter_classes`
:[Fixpoint Widgets]:
Store (user) fixpoint widgets in `fb.fixpoint_classes`
Parameters
----------
None
Returns
-------
None, but `self.conf` contains the parsed configuration file.
"""
def read_conf_file():
self.conf.clear()
self.conf.read(dirs.USER_CONF_DIR_FILE)
sect = ""
for s in self.conf.sections():
sect += "\t\t[" + str(s) + "]\n"
logger.info("Parsing config file\n\t'{0}' with sections:\n{1}"
.format(dirs.USER_CONF_DIR_FILE, sect))
# -----------------
def read_conf_version():
"""
Try to read out the version of the config file, if the version
number cannot be read or is not equal to the required number,
return False.
"""
success = True
try:
conf_ver = int(self.commons['version'][0])
if conf_ver != self.REQ_VERSION:
logger.error(
"User config file\n\t'{conf_file:s}'\n\thas the wrong version "
"'{conf_ver}' (required: '{req_version}')."
.format(conf_file=dirs.USER_CONF_DIR_FILE, conf_ver=conf_ver,
req_version=self.REQ_VERSION))
success = False
except KeyError:
logger.error("No entry 'version' in {0}".format(dirs.USER_CONF_DIR_FILE))
success = False
except (IndexError, ValueError, TypeError):
logger.error(
f"No suitable value for 'version' in {dirs.USER_CONF_DIR_FILE}")
success = False
return success
# --------------
# ========= Starting here! =============================================
try:
# Test whether user config file is readable, this is necessary as
# configParser quietly fails when the file doesn't exist
if not os.access(dirs.USER_CONF_DIR_FILE, os.R_OK):
raise IOError(
f'Config file "{dirs.USER_CONF_DIR_FILE}"')
# -----------------------------------------------------------------
# setup an instance of config parser, allow keys without value
# -----------------------------------------------------------------
# preserve case of parsed options by overriding optionxform():
self.conf = configparser.ConfigParser(allow_no_value=True)
# Set it to function str()
self.conf.optionxform = str
# Allow interpolation across sections, ${Dirs:dir1}
self.conf._interpolation = configparser.ExtendedInterpolation()
read_conf_file()
# ------------------------------------------------------------------
# Parsing [Common]
# ------------------------------------------------------------------
self.commons = self.parse_conf_section("Common")
logger.info("Found {0} entries in [Common]".format(len(self.commons)))
if not read_conf_version():
dirs.update_conf_files(logger)
read_conf_file()
self.commons = self.parse_conf_section("Common")
logger.info(
f"Found {len(self.commons)} entries in [Common] (new config file)")
if not read_conf_version():
logger.critical("Version number is still invalid, terminating.")
sys.exit()
if 'user_dirs' in self.commons:
for d in self.commons['user_dirs']:
d = os.path.abspath(os.path.normpath(d))
if os.path.isdir(d):
dirs.USER_DIRS.append(d)
if d not in sys.path:
sys.path.append(d)
else:
logger.warning("User directory doesn't exist:\n\t{0}\n".format(d))
if dirs.USER_DIRS:
logger.info("User directory(s):\n\t{0}\n".format(dirs.USER_DIRS))
else:
logger.info("No valid user directory specified.")
# ------------------------------------------------------------------
# Parsing [Input Widgets]
# ------------------------------------------------------------------
fb.input_classes = self.build_class_dict("Input Widgets", "input_widgets")
# ------------------------------------------------------------------
# Parsing [Plot Widgets]
# ------------------------------------------------------------------
fb.plot_classes = self.build_class_dict("Plot Widgets", "plot_widgets")
# ------------------------------------------------------------------
# Parsing [Filter Widgets]
# ------------------------------------------------------------------
fb.filter_classes = self.build_class_dict("Filter Widgets", "filter_widgets")
# currently, option "opt" can only be an association with a fixpoint
# widget, so replace key "opt" by key "fix":
# Convert to list in any case
for c in fb.filter_classes:
if 'opt' in fb.filter_classes[c]:
fb.filter_classes[c]['fix'] = fb.filter_classes[c].pop('opt')
if 'fix' in fb.filter_classes[c] and\
type(fb.filter_classes[c]['fix']) == str:
fb.filter_classes[c]['fix'] = fb.filter_classes[c]['fix'].split(',')
# ------------------------------------------------------------------
# Parsing [Fixpoint Filters]
# ------------------------------------------------------------------
fb.fixpoint_classes = self.build_class_dict(
"Fixpoint Widgets", "fixpoint_widgets")
# First check whether fixpoint options of the filter widgets are
# valid fixpoint classes by comparing them to the verified items of
# fb.fixpoint_classes:
for c in fb.filter_classes:
if 'fix' in fb.filter_classes[c]:
for w in fb.filter_classes[c]['fix']:
if w not in fb.fixpoint_classes:
logger.warning(
f'Removing invalid fixpoint module\n\t"{w}" '
f'for filter class "{c}".')
fb.filter_classes[c]['fix'].remove(w)
# merge fb.filter_classes info "filter class":[fx_class1, fx_class2]
# and fb.fixpoint_classes info "fixpoint class":[fil_class1, fil_class2]
# into the fb.filter_classes dict
# collect all fixpoint widgets (keys in fb.fixpoint_classes) which
# have the class name c as a value
fix_wdg = {k for k, val in fb.fixpoint_classes.items() if c in val['opt']}
if len(fix_wdg) > 0:
if 'fix' in fb.filter_classes[c]:
# ... and merge it with the fixpoint options of class c
fix_wdg = fix_wdg.union(fb.filter_classes[c]['fix'])
fb.filter_classes[c].update({'fix': list(fix_wdg)})
# ----- Exceptions ----------------------
except configparser.DuplicateSectionError as e:
logger.critical('Duplicate section in config file '
f'"{dirs.USER_CONF_DIR_FILE}":\n{e}.')
sys.exit()
except configparser.ParsingError as e:
logger.critical('Parsing error in config file "{0}:\n{1}".'
.format(dirs.USER_CONF_DIR_FILE, e))
sys.exit()
except configparser.Error as e:
logger.critical(f'{e} in config file "{dirs.USER_CONF_DIR_FILE}".')
sys.exit()
# --------------------------------------------------------------------------
[docs]
def parse_conf_section(self, section):
"""
Parse ``section`` in config file `conf` and return an OrderedDict
with the elements ``{key:<OPTION>}`` where `key` and <OPTION>
have been read from the config file. <OPTION> has been sanitized and
converted to a list or a dict.
Parameters
----------
section : str
name of the section to be parsed
Returns
-------
section_conf_dict : dict
Ordered dict with the keys of the config files and corresponding values
"""
try:
section_conf_dict = OrderedDict()
# get entries from config file with [name, path]
items_list = self.conf.items(section)
if len(items_list) > 0:
for i in items_list:
# sanitize value and convert to a list, split at \n and ,
val = i[1].strip(' \t\n\r[]"')
if len(i[1]) == 0:
val = ""
elif i[1][0] == '{': # try to convert to dict
try:
val = ast.literal_eval(val)
except SyntaxError as e:
logger.warning(f"Syntax Error in config file\n{e}")
val = ""
else:
val = re.sub('["\'\[\]]','', val)
val = re.split('; |, |\n|,\n|\r', val) # TODO: Test
section_conf_dict.update({i[0]: val})
logger.debug('Found {0:2d} entries in [{1:s}].'
.format(len(section_conf_dict), section))
else:
logger.warning('Empty section [{0:s}].'.format(section))
except configparser.NoSectionError:
logger.warning('No section [{0:s}] in config file "{1:s}".'
.format(section, dirs.USER_CONF_DIR_FILE))
# configparser.NoOptionError
except configparser.DuplicateOptionError as e:
logger.warning('{0} in config file "{1}".'.format(e, dirs.USER_CONF_DIR_FILE))
except configparser.InterpolationMissingOptionError as e:
# catch unresolvable interpolations like ${wrongSection:wrongOption}
# Attention: This terminates current section() without result!
logger.warning('{0} in config file "{1}".'.format(e, dirs.USER_CONF_DIR_FILE))
return section_conf_dict
# --------------------------------------------------------------------------
[docs]
def build_class_dict(self, section, subpackage=""):
"""
- Try to dynamically import the modules (= files) parsed in `section`
reading their module level attribute `classes` listing the classes
contained in the module.
When `classes` is a dictionary, e.g. `{"Cheby":"Chebyshev 1"}` where
the key is the class name in the module and the value the corresponding
display name (used for the combo box).
- When `classes` is a string or a list, use the string resp. the list items
for both class and display name.
- Try to import the filter classes
Parameters
----------
section: str
Name of the section in the configuration file to be parsed by
``self.parse_conf_section``.
subpackage: str
Name of the subpackage containing the module to be imported. Module
names are prepended successively with
`['pyfda.' + subpackage + '.', '', subpackage + '.']`
Returns
-------
classes_dict : dict
A dictionary with the classes as keys; values are dicts which define
the options (like display name, module path, fixpoint implementations etc).
Each entry has the form e.g.
{<class name>:{'name':<display name>, 'mod':<full module name>}} e.g.
.. code-block:: python
{'Cheby1':{'name':'Chebyshev 1',
'mod':'pyfda.filter_design.cheby1',
'fix': 'IIR_cascade',
'opt': ["option1", "option2"]}
"""
classes_dict = OrderedDict() # dict for all successfully imported classes
num_imports = 0 # number of successful module imports
imported_classes = "" # names of successful module imports
pckg_names = ['pyfda.'+subpackage+'.', '', subpackage+'.'] # search in that order
section_conf_dict = self.parse_conf_section(section)
for mod_name in section_conf_dict: # iterate over dict keys found in config file
for p in pckg_names:
try: # Try to import the module from the package list above
mod_fq_name = p + mod_name # fully qualified module name (fqn)
# Try to import the module from the package and get a handle:
logger.debug(mod_fq_name)
################################################
mod = importlib.import_module(mod_fq_name)
################################################
break # -> successful import, break out of pckg_names loop
except ImportError as e:
logger.debug(f'Import error for "{mod_fq_name}":\n{e}')
mod_fq_name = None
continue # module not found, try next package
except Exception as e:
logger.warning(f'Error during import of "{mod_fq_name}":\n{e}')
mod_fq_name = None
continue # Some other error ocurred during import, try next package
if not mod_fq_name:
logger.warning(f'Module "{mod_name}" could not be imported.')
continue
if hasattr(mod, 'classes'):
# check type of module attribute 'classes', try to convert to dict
if isinstance(mod.classes, dict): # dict {class name : combo box name}
mod_dict = mod.classes # one or more filter classes in one file
elif isinstance(mod.classes, str): # String, create a dict with the
mod_dict = {mod.classes: mod.classes} # string as both key and value
elif isinstance(mod.classes, list): # list, create a dict with list items
mod_dict = {l: l for l in list} # as both key and value
else:
logger.warning(
f"Skipping module '{mod_name}', its attribute 'classes' "
f"has the wrong type '{type(mod.classes).__name__}'.")
continue # with next entry in section_conf_dict
# logger.info("MOD_DICT: {0}".format(mod_dict))
else:
# no `classes` attribute - skip entry
logger.warning(
f'Skipping module "{mod_name}" due to missing attribute "classes".')
continue
# Now, check whether class `c` is part of module `mod`
for c in mod_dict:
if not hasattr(mod, c): # class c doesn't exist in module
logger.warning(
f"Skipping class '{c}', it doesn't exist in "
f"module '{mod_fq_name}'.")
continue # continue with next entry in classes_dict
else:
classes_dict.update(
{c: {'name': mod_dict[c], # Class name
'mod': mod_fq_name}}) # Fully qualified module name
# when module + class import was successful, add a new entry
# to the dict with the class name as key and a dict containing
# "name":display name and "mod":fully qualified module name as values,
# e.g. 'Butter':{'name':'Butterworth',
# 'mod':'pyfda.filter_design.butter'}
# check whether options have been defined in the config file
opt = section_conf_dict[mod_name]
if opt:
if type(opt) == dict:
classes_dict[c].update(opt)
elif type(opt) in {str, list}: # create dict {'opt':<OPTION>}
classes_dict[c].update({"opt": opt})
else:
logger.warning(
f'Class "{c}" option data type "{type(opt).__name__}" '
f'not understood:\n "{opt}"')
# logger.info("Opt : {0}".format(classes_dict[c]))
num_imports += 1
imported_classes += "\t" + mod_fq_name + "." + c + "\n"
if num_imports < 1:
logger.warning("No class could be imported.")
else:
logger.info("Found {0:d} classes in [{1:s}]:\n{2:s}"
.format(num_imports, section, imported_classes))
logger.debug(classes_dict)
return classes_dict
# --------------------------------------------------------------------------
[docs]
def build_fil_tree(self, fc, rt_dict, fil_tree=None):
"""
Read attributes (ft, rt, rt:fo) from filter class fc)
Attributes are stored in
the design method classes in the format (example from ``common.py``)
.. code-block:: python
self.ft = 'IIR'
self.rt_dict = {
'LP': {'man':{'fo': ('a','N'),
'msg': ('a', r"<br /><b>Note:</b> Read this!"),
'fspecs': ('a','F_C'),
'tspecs': ('u', {'frq':('u','F_PB','F_SB'),
'amp':('u','A_PB','A_SB')})
},
'min':{'fo': ('d','N'),
'fspecs': ('d','F_C'),
'tspecs': ('a', {'frq':('a','F_PB','F_SB'),
'amp':('a','A_PB','A_SB')})
}
},
'HP': {'man':{'fo': ('a','N'),
'fspecs': ('a','F_C'),
'tspecs': ('u', {'frq':('u','F_SB','F_PB'),
'amp':('u','A_SB','A_PB')})
},
'min':{'fo': ('d','N'),
'fspecs': ('d','F_C'),
'tspecs': ('a', {'frq':('a','F_SB','F_PB'),
'amp':('a','A_SB','A_PB')})
}
}
}
Build a dictionary of all filter combinations with the following hierarchy:
response types -> filter types -> filter classes -> filter order
rt (e.g. 'LP') ft (e.g. 'IIR') fc (e.g. 'cheby1') fo ('min' or 'man')
All attributes found for fc are arranged in a dict, e.g.
for ``cheby1.LPman`` and ``cheby1.LPmin``, listing the parameters to be
displayed and whether they are active, unused, disabled or invisible for
each subwidget:
.. code-block:: python
'LP':{
'IIR':{
'Cheby1':{
'man':{'fo': ('a','N'),
'msg': ('a', r"<br /><b>Note:</b> Read this!"),
'fspecs': ('a','F_C'),
'tspecs': ('u', {'frq':('u','F_PB','F_SB'),
'amp':('u','A_PB','A_SB')})
},
'min':{'fo': ('d','N'),
'fspecs': ('d','F_C'),
'tspecs': ('a', {'frq':('a','F_PB','F_SB'),
'amp':('a','A_PB','A_SB')})
}
}
}
}, ...
Finally, the whole structure is frozen recursively to avoid inadvertedly
changing the filter tree.
For a full example, see the default filter tree ``fb.fil_tree`` defined
in ``filterbroker.py``.
Parameters
----------
None
Returns
-------
dict
filter tree
"""
if not fil_tree:
fil_tree = {}
ft = ff.fil_inst.ft # get filter type (e.g. 'FIR')
for rt in rt_dict: # iterate over all response types
if rt == 'COM': # handle common info later
continue
if rt not in fil_tree: # is response type already in dict?
fil_tree.update({rt: {}}) # no, create it
if ft not in fil_tree[rt]: # filter type already in dict[rt]?
fil_tree[rt].update({ft: {}}) # no, create it
if fc not in fil_tree[rt][ft]: # filter class already in dict[rt][ft]?
fil_tree[rt][ft].update({fc: {}}) # no, create it
# now append all the individual 'min' / 'man' subwidget infos to fc:
fil_tree[rt][ft][fc].update(rt_dict[rt])
if 'COM' in rt_dict: # Now handle common info
for fo in rt_dict[rt]: # iterate over 'min' / 'max'
if fo in rt_dict['COM']: # and add common info first
merge_dicts_hierarchically(fil_tree[rt][ft][fc][fo],
rt_dict['COM'][fo], mode='add2')
return fil_tree
# ==============================================================================
if __name__ == "__main__":
# Need to start a QApplication to avoid the error
# "QWidget: Must construct a QApplication before a QPaintDevice"
# when instantiating filters with dynamic widgets (equiripple, firwin)
from .compat import QApplication
app = QApplication(sys.argv)
print("===== Initialize FilterReader ====")
filt_file_name = "filter_list.txt"
conf_dir = "filter_design"
# Create a new FilterFileReader instance & initialize it
myTreeBuilder = Tree_Builder(conf_dir, filt_file_name)
print("\n===== Start Test ====")
filterTree = myTreeBuilder.build_fil_tree()
print('fb.fil_tree = ', fb.fil_tree)