Skip to content
Snippets Groups Projects
Commit 0cd3c0c1 authored by Florian Spreckelsen's avatar Florian Spreckelsen
Browse files

ENH: Support list of directories for crawler_main and scan_directory

parent 444f9831
No related branches found
No related tags found
2 merge requests!217TST: Make NamedTemporaryFiles Windows-compatible,!208ENH: Allow crawler_main to operate on a list of paths
......@@ -621,7 +621,7 @@ one with the entities that need to be updated and the other with entities to be
crawled_data: Optional[list[db.Record]] = None,
no_insert_RTs: Optional[list[str]] = None,
no_update_RTs: Optional[list[str]] = None,
path_for_authorized_run: Optional[str] = "",
path_for_authorized_run: Optional[Union[str, list[str]]] = "",
):
"""
This function applies several stages:
......@@ -643,7 +643,7 @@ one with the entities that need to be updated and the other with entities to be
no_update_RTs : list[str], optional
list of RecordType names. Records that have one of those RecordTypes
as parent will not be updated
path_for_authorized_run : str, optional
path_for_authorized_run : str or list[str], optional
only used if there are changes that need authorization before being
applied. The form for rerunning the crawler with the authorization
of these changes will be generated with this path. See
......@@ -718,11 +718,21 @@ one with the entities that need to be updated and the other with entities to be
update_cache = UpdateCache()
pending_inserts = update_cache.get_inserts(self.run_id)
if pending_inserts:
if isinstance(path_for_authorized_run, list):
raise NotImplementedError(
"Authorization of inserts is currently implemented only for single paths, "
"not for lists of paths."
)
Crawler.inform_about_pending_changes(
pending_inserts, self.run_id, path_for_authorized_run)
pending_updates = update_cache.get_updates(self.run_id)
if pending_updates:
if isinstance(path_for_authorized_run, list):
raise NotImplementedError(
"Authorization of updates is currently implemented only for single paths, "
"not for lists of paths."
)
Crawler.inform_about_pending_changes(
pending_updates, self.run_id, path_for_authorized_run)
......@@ -1004,7 +1014,7 @@ def _store_dry_run_data(ins, upd):
"update": updates}))
def crawler_main(crawled_directory_path: str,
def crawler_main(crawled_directory_path: Union[str, list[str]],
cfood_file_name: str,
identifiables_definition_file: Optional[str] = None,
debug: bool = False,
......@@ -1022,8 +1032,8 @@ def crawler_main(crawled_directory_path: str,
Parameters
----------
crawled_directory_path : str
path to be crawled
crawled_directory_path : str or list[str]
path(s) to be crawled
cfood_file_name : str
filename of the cfood to be used
identifiables_definition_file : str
......
......@@ -421,7 +421,7 @@ def scanner(items: list[StructureElement],
# --------------------------------------------------------------------------------
def scan_directory(dirname: str, crawler_definition_path: str,
def scan_directory(dirname: Union[str, list[str]], crawler_definition_path: str,
restricted_path: Optional[list[str]] = None,
debug_tree: Optional[DebugTree] = None):
""" Crawl a single directory.
......@@ -434,10 +434,12 @@ def scan_directory(dirname: str, crawler_definition_path: str,
Parameters
----------
dirname: str or list[str]
directory or list of directories to be scanned
restricted_path: optional, list of strings
Traverse the data tree only along the given path. When the end of the given path
is reached, traverse the full tree as normal. See docstring of 'scanner' for
more details.
Traverse the data tree only along the given path. When the end
of the given path is reached, traverse the full tree as
normal. See docstring of 'scanner' for more details.
Returns
-------
......@@ -455,26 +457,31 @@ def scan_directory(dirname: str, crawler_definition_path: str,
if not dirname:
raise ValueError(
"You have to provide a non-empty path for crawling.")
dir_structure_name = os.path.basename(dirname)
# TODO: needs to be covered somewhere else
crawled_directory = dirname
if not dir_structure_name and dirname.endswith('/'):
if dirname == '/':
# Crawling the entire file system
dir_structure_name = "root"
else:
# dirname had a trailing '/'
dir_structure_name = os.path.basename(dirname[:-1])
return scan_structure_elements(Directory(dir_structure_name,
dirname),
crawler_definition,
converter_registry,
restricted_path=restricted_path,
debug_tree=debug_tree,
registered_transformer_functions=registered_transformer_functions
)
if not isinstance(dirname, list):
dirname = [dirname]
dir_element_list = []
for dname in dirname:
dir_structure_name = os.path.basename(dname)
# TODO: needs to be covered somewhere else
crawled_directory = dname
if not dir_structure_name and dname.endswith('/'):
if dname == '/':
# Crawling the entire file system
dir_structure_name = "root"
else:
# dirname had a trailing '/'
dir_structure_name = os.path.basename(dname[:-1])
dir_element_list.append(Directory(dir_structure_name, dname))
return scan_structure_elements(
dir_element_list,
crawler_definition,
converter_registry,
restricted_path=restricted_path,
debug_tree=debug_tree,
registered_transformer_functions=registered_transformer_functions
)
def scan_structure_elements(items: Union[list[StructureElement], StructureElement],
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment