# Copyright (c) 2025, TU Wien
# of Geodesy and Geoinformation (GEO).
# All rights reserved.
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL VIENNA UNIVERSITY OF TECHNOLOGY,
# DEPARTMENT OF GEODESY AND GEOINFORMATION BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
# OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
SGRT folder and file name definition.
"""
import os
import copy
import datetime as dt
from datetime import datetime
from collections import OrderedDict
from geopathfinder.folder_naming import build_smarttree
from geopathfinder.folder_naming import create_smartpath
from geopathfinder.file_naming import SmartFilename
# Please add here new sensors if they follow the SGRT naming convention.
allowed_sensor_dirs = [
'Sentinel-1_CSAR', 'SCATSAR', 'METOP_ASCAT', 'Envisat_ASAR'
]
[docs]
class SgrtFilename(SmartFilename):
"""
SGRT file name definition using SmartFilename class.
"""
fields_def = OrderedDict([('pflag', {
'len': 1,
'delim': ''
}), ('dtime_1', {
'len': 8
}), ('dtime_2', {
'len': 8
}), ('var_name', {
'len': 9
}), ('mission_id', {
'len': 2,
'delim': ''
}), ('spacecraft_id', {
'len': 1,
'delim': ''
}), ('mode_id', {
'len': 2,
'delim': ''
}), ('product_type', {
'len': 3,
'delim': ''
}), ('res_class', {
'len': 1,
'delim': ''
}), ('level', {
'len': 1,
'delim': ''
}), ('pol', {
'len': 2,
'delim': ''
}), ('orbit_direction', {
'len': 1
}), ('relative_orbit', {
'len': 3
}), ('workflow_id', {
'len': 5
}), ('grid_name', {
'len': 6
}), ('tile_name', {
'len': 10
})])
pad = "-"
delimiter = "_"
def __init__(self, fields, ext=".tif", convert=False, compact=False):
"""
Constructor of SgrtFilename class.
Parameters
----------
fields: dict
Dictionary specifying the different parts of the filename.
convert: bool, optional
If true, decoding is applied to parts of the filename, where such an operation is available (default is False).
"""
self.date_format = "%Y%m%d"
self.time_format = "%H%M%S"
fields = fields.copy()
if 'dtime_2' not in fields.keys():
self.single_date = True
apply_dtime_2 = False
else:
self.single_date = False
apply_dtime_2 = True
if isinstance(fields['dtime_2'], str):
if fields['dtime_2'].endswith('--'):
self.single_date = True
else:
if isinstance(fields['dtime_2'],
dt.time) or (fields['dtime_2'].year < 1950):
self.single_date = True
if 'dtime_1' in fields.keys():
if self.single_date:
date = self.encode_date(fields['dtime_1'])
if apply_dtime_2:
time = self.encode_time(fields['dtime_2'])
else:
time = self.encode_time(fields['dtime_1'])
fields['dtime_1'] = date
fields['dtime_2'] = time
else:
fields['dtime_1'] = self.encode_date(fields['dtime_1'])
fields['dtime_2'] = self.encode_date(fields['dtime_2'])
fields_def_ext = copy.deepcopy(SgrtFilename.fields_def)
fields_def_ext['dtime_1']['decoder'] = lambda x: self.decode_date(x)
fields_def_ext['dtime_1']['encoder'] = lambda x: self.encode_date(x)
fields_def_ext['dtime_2']['decoder'] = lambda x: self.decode_time(x)
fields_def_ext['dtime_2']['encoder'] = lambda x: self.encode_time(x)
fields_def_ext['relative_orbit'][
'decoder'] = lambda x: self.decode_rel_orbit(x)
fields_def_ext['relative_orbit'][
'encoder'] = lambda x: self.encode_rel_orbit(x)
super(SgrtFilename, self).__init__(fields,
fields_def_ext,
ext=ext,
pad=SgrtFilename.pad,
delimiter=SgrtFilename.delimiter,
convert=convert,
compact=compact)
[docs]
@classmethod
def from_filename(cls, filename_str, convert=False, compact=False):
"""
Converts a filename given as a string into an SgrtFilename class object.
Parameters
----------
filename_str : str
Filename without any paths (e.g., "M20170725_165004--_SIG0-----_S1BIWGRDH1VVA_146_A0104_EU500M_E048N012T6.tif").
convert: bool, optional
If true, decoding is applied to parts of the filename, where such an operation is available (default is False).
Returns
-------
SgrtFilename
Class representing an SGRT filename.
"""
return super().from_filename(filename_str,
SgrtFilename.fields_def,
pad=SgrtFilename.pad,
delimiter=SgrtFilename.delimiter,
convert=convert,
compact=compact)
@property
def stime(self):
"""
Start time.
Returns
-------
datetime.datetime
Start time.
"""
try:
return datetime.combine(
self["dtime_1"],
self["dtime_2"]) if self.single_date else self["dtime_1"]
except TypeError:
return None
@property
def etime(self):
"""
End time.
Returns
-------
datetime.datetime
End time.
"""
try:
return datetime.combine(
self["dtime_1"],
self["dtime_2"]) if self.single_date else self["dtime_2"]
except TypeError:
return None
@property
def time(self):
"""
Unified time.
Returns
-------
datetime.datetime
Unified time.
"""
try:
if self.single_date:
return self.stime
else:
return self.stime + (self.etime - self.stime) / 2
except TypeError:
return None
@property
def product_id(self):
"""
Builds product id from other filename attributes (e.g. 'S1AIWGRDH').
Returns
-------
product_id: str
Product id consisting of mission id (e.g., 'S1'), spacecraft id (e.g., 'A'), mode id (e.g., 'IW'),
product type (e.g., 'GRD') and resolution class (e.g., 'H').
"""
try:
product_id = "".join([
self["mission_id"], self["spacecraft_id"], self["mode_id"],
self["product_type"], self["res_class"]
])
except TypeError:
product_id = None
return product_id
@property
def ftile(self):
"""
Builds the full tile name from other filename attributes (e.g. 'EU010M_E048N015T1').
Returns
-------
ftile: str
Full tile name consisting of grid name (e.g., 'EU10M') and tile name (e.g., 'E048N015T1').
"""
try:
ftile = "_".join([self["grid_name"], self["tile_name"]])
except TypeError:
ftile = None
return ftile
[docs]
def decode_date(self, string):
"""
Decodes a string into a datetime.date object. The format is given by the class.
Parameters
----------
string: str, object
String needed to be decoded to a datetime.date object.
Returns
-------
datetime.date, object
Original object or datetime.date object parsed from the given string.
"""
if isinstance(string, str):
return datetime.strptime(string, self.date_format).date()
else:
return string
[docs]
def decode_time(self, string):
"""
Decodes a string into a datetime.time/datetime.date object. The format is given by the class and the conversion
follows the 'single_date' setting.
Parameters
----------
string: str, object
String needed to be decoded to a datetime.time/datetime.date object.
Returns
-------
datetime.date, datetime.time, object
Original object, datetime.date or datetime.time object parsed from the given string.
"""
if isinstance(string, str):
if self.single_date:
return datetime.time(
datetime.strptime(string, self.time_format))
else:
return self.decode_date(string)
else:
return string
[docs]
def decode_rel_orbit(self, string):
"""
Decodes a string into an integer.
Parameters
----------
string: str, object
String needed to be decoded to an integer.
Returns
-------
int, object
Original object or integer object parsed from the given string.
"""
if isinstance(string, str):
return int(string)
else:
return string
[docs]
def encode_date(self, time_obj):
"""
Encodes a datetime.datetime/datetime.date object into a string. The format is given by the class.
Parameters
----------
time_obj: datetime.datetime, datetime.date or object
Datetime object needed to be encoded to a string.
Returns
-------
str, object
Original object or str object parsed from the given datetime object.
"""
if isinstance(time_obj, (dt.datetime, dt.date, dt.time)):
return time_obj.strftime(self.date_format)
else:
return time_obj
[docs]
def encode_time(self, time_obj):
"""
Encodes a datetime.datetime/datetime.date object into a string. The format is given by the class.
Parameters
----------
time_obj: datetime.datetime, datetime.date or object
Datetime object needed to be encoded to a string.
Returns
-------
str, object
Original object or str object parsed from the given datetime object.
"""
if isinstance(time_obj, (dt.datetime, dt.time, dt.date)):
if self.single_date:
return time_obj.strftime(self.time_format)
else:
return time_obj.strftime(self.date_format)
else:
return time_obj
[docs]
def encode_rel_orbit(self, relative_orbit):
"""
Encodes a relative orbit number into a string.
Parameters
----------
relative_orbit: int or object
Integer needed to be encoded to a string.
Returns
-------
str, object
Original object or str object parsed from the given integer.
"""
if isinstance(relative_orbit, int):
return "{:03d}".format(relative_orbit)
else:
return relative_orbit
[docs]
def sgrt_path(root,
mode=None,
group=None,
datalog=None,
product=None,
wflow=None,
grid=None,
tile=None,
var=None,
qlook=True,
make_dir=False):
"""
Realisation of the full SGRT folder naming convention, yielding a single
SmartPath.
Parameters
----------
root : str
root directory of the path. must contain satellite sensor at toplevel.
e.g. "R:\Datapool_processed\Sentinel-1_CSAR"
mode : str
e.g "IWGRDH"
group : str, optional
"preprocessed" or "parameters" or "products"
datalog : str, optional
must be "datasets" or "logfiles"
product : str
e.g. "ssm"
wflow : str
e.g. "C1003"
grid : str
e.g. "EQUI7_EU500M"
tile : str
e.g. "E048N012T6"
var : str
e.g. "ssm"
qlook : bool
if the quicklook subdir should be integrated
make_dir : bool
if the directory should be created on the filesystem
Returns
-------
SmartPath
Object for the path
"""
# check the sensor folder name
if root.split(os.sep)[-1] not in allowed_sensor_dirs:
raise ValueError('Wrong input for "root"!')
# define the datalog folder name
if datalog is None:
if isinstance(wflow, str):
datalog = 'datasets'
elif datalog == 'logfiles':
product = None
wflow = None
grid = None
tile = None
var = None
qlook = False
elif datalog == 'datasets':
pass
else:
raise ValueError('Wrong input for "datalog" level!')
# define the group folder name
if group is None:
if wflow.startswith('A'):
group = 'preprocessed'
elif wflow.startswith('B'):
group = 'parameters'
elif wflow.startswith('C'):
group = 'products'
else:
raise ValueError('Wrong input for "wflow" level!')
# defining the folder levels
levels = [mode, group, datalog, product, wflow, grid, tile, var, 'qlooks']
# defining the hierarchy
hierarchy = [
'mode', 'group', 'datalog', 'product', 'wflow', 'grid', 'tile', 'var',
'qlook'
]
if qlook is False:
levels.remove('qlooks')
hierarchy.remove('qlook')
return create_smartpath(root,
hierarchy=hierarchy,
levels=levels,
make_dir=make_dir)
[docs]
def sgrt_tree(root,
target_level=None,
register_file_pattern=None,
subset_level=None,
subset_pattern=None,
subset_unique=False):
"""
Realisation of the full SGRT folder naming convention, yielding a
SmartTree(), reflecting all subfolders as SmartPath()
Parameters
----------
root : str
top level directory of the SGRT dataset, which is the sensor name in
the SGRT naming convention.
E.g.: "R:\Datapool_processed\Sentinel-1_CSAR"
target_level : str, optional
Can speed up things: Level name of target tree-depth.
The SmartTree is only built from directories reaching this level,
and only built down to this level. If not set, all directories are
built down to deepest depth.
register_file_pattern : str tuple, optional
strings defining search pattern for file search for file_register
e.g. ('C1003', 'E048N012T6').
No asterisk is needed ('*')!
Sequence of strings in given tuple is crucial!
Be careful: If the tree is large, this can take a while!
subset_level : str tuple, optional
Name of level in tree's hierarchy where the subset should be applied
e.g. ('tile').
Default level is ('grid')
subset_pattern : str tuple, optional
Strings defining search pattern for subset_level, meaning only paths
matching this pattern at "subset_level" will be included in the SmartTree().
Default pattern is ('EQUI7').
e.g. ('EQUI7', '500M'), or ('500M'). No asterisk is needed ('*')!
Sequence of strings in given tuple is crucial!
subset_unique : bool, optional
defines of the subset will deliver...
True: just one single subtree that matches uniquely the subset_pattern,
and which is rebased to the subset_level.
False: all subtrees that match the subset_pattern (Default).
Returns
-------
SmartTree
Object for the SGRT tree.
"""
# defining the hierarchy
hierarchy = [
'mode', 'group', 'datalog', 'product', 'wflow', 'grid', 'tile', 'var',
'qlook'
]
# Check for allowed directory topnames for "root".
if root.split(os.sep)[-1] in allowed_sensor_dirs:
sgrt_tree = build_smarttree(
root,
hierarchy,
target_level=target_level,
register_file_pattern=register_file_pattern)
else:
raise ValueError('Root-directory "{}" does is '
'not a valid SGRT folder!'.format(root))
# limit the tree to a subtree with all paths that match the subset_pattern at subset_level
if subset_level is not None and not subset_unique:
sgrt_tree = sgrt_tree.get_subtree_matching(
subset_level,
subset_pattern,
register_file_pattern=register_file_pattern)
# limit the tree to a single, unique, small subtree that matches the subset_pattern at subset_level,
# which is re-rooted to that level.
elif subset_level is not None:
sgrt_tree = sgrt_tree.get_subtree_unique_rebased(
subset_level,
subset_pattern,
register_file_pattern=register_file_pattern)
return sgrt_tree
if __name__ == '__main__':
pass