Skip to content

Base Stats

Bases: ABC

Abstract base class for dataset feature extraction and analysis.

This class defines the interface for reading different annotation formats (YOLO, VOC) and provides a high-performance pipeline for feature extraction. It supports incremental caching, multi-process execution, and UMAP dimensionality reduction for visual manifold analysis.

Source code in tools/stats/base_stats.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
class BaseStats(ABC):
    """
    Abstract base class for dataset feature extraction and analysis.

    This class defines the interface for reading different annotation formats
    (YOLO, VOC) and provides a high-performance pipeline for feature extraction.
    It supports incremental caching, multi-process execution, and UMAP
    dimensionality reduction for visual manifold analysis.
    """
    TASK: str = "stats"

    def __init__(
            self,
            source_format: str,
            log_level: str = LevelMapping.debug,
            log_path: Optional[Path] = None,
            settings: Optional[AppSettings] = None,
            cache_io: Optional[CacheIO] = None,
            img_path: Optional[Union[Path, str]] = None,
            extensions: Optional[Tuple[str, ...]] = None,
    ):
        """
        Initializes the analytical engine with specific formats and IO tools.

        Args:
            source_format (str): Annotation format identifier (e.g., 'yolo', 'voc').
            log_level (str): Minimum logging level. Defaults to 'INFO'.
            log_path (Optional[Path]): Directory for log files.
            settings (Optional[AppSettings]): Application-wide settings.
            cache_io (Optional[CacheIO]): Component for Parquet-based caching.
            img_path (Optional[Union[Path, str]]): Path to the dataset images.
            extensions (Optional[Tuple[str, ...]]): Valid image file extensions.
        """

        self.reader_mapping = {
            ".xml": XMLReader,
            ".txt": TXTReader
        }

        self.suffix_mapping = {
            "voc": ".xml",
            "yolo": ".txt"
        }

        self.settings = settings
        self.extensions = extensions
        self.img_path = img_path
        self.margin_threshold: int = self.settings.margin_threshold
        self.reader = self.reader_mapping.get(source_format, None)
        self.source_suffix = self.suffix_mapping.get(source_format)
        self.reader = self.reader_mapping[self.source_suffix]()
        self.cache_io = cache_io or CacheIO(self.settings)
        self.n_jobs = self.settings.n_jobs
        self.logger = LoggerConfigurator.setup(
            name=self.__class__.__name__,
            log_level=log_level,
            log_path=Path(log_path) / f"{self.__class__.__name__}.log" if log_path else None
        )

    @classmethod
    @abstractmethod
    def _init_worker(cls, images: Dict[str, str]) -> None:
        """
        Initializes a static worker with shared data for multiprocessing.

        Args:
            images (Dict[str, str]): A dictionary mapping image stems to absolute paths.
        """
        pass

    @staticmethod
    @abstractmethod
    def _analyze_worker(
            file_path: Path,
            reader: BaseReader,
            margin_threshold: int = 5,
            class_mapping: Optional[Dict[str, str]] = None
    ) -> List[Dict[str, str]]:
        """
        Processes a single annotation file to extract features.

        Args:
            file_path (Path): Path to the annotation file.
            reader (BaseReader): Annotation reader instance.
            margin_threshold (int): Pixel margin for boundary analysis.
            class_mapping (Optional[Dict[str, str]]): Map of class IDs to names.

        Returns:
            List[Dict[str, str]]: A list of dictionaries, where each dict represents
                features of one detected object.
        """
        pass

    @staticmethod
    @abstractmethod
    def get_umap_features(df: pd.DataFrame) -> List[str]:
        """
        Defines the list of numeric features to be used for UMAP projection.

        Args:
            df (pd.DataFrame): The extracted feature matrix.

        Returns:
            List[str]: List of column names for dimensionality reduction.
        """
        pass

    def get_features(
            self,
            file_paths: Tuple[Path, ...],
            class_mapping: Optional[Dict[str, str]] = None
    ) -> pd.DataFrame:
        """
        Orchestrates feature extraction using incremental caching and parallel processing.

        This method checks the modification time (mtime) of each file. It only
        processes new or changed files, significantly reducing execution time
        for large datasets.

        Args:
            file_paths (Tuple[Path, ...]): List of annotation files to process.
            class_mapping (Optional[Dict[str, str]]): Class ID to name mapping.

        Returns:
            pd.DataFrame: A complete feature matrix including UMAP coordinates
                and outlier flags.
        """
        if not file_paths:
            return pd.DataFrame()

        cache_file = self.settings.cache_file_path / self.cache_io.generate_cache_filename(
            source_path=file_paths[0].parent,
            cache_name=self.settings.cache_name,
            format=self.source_suffix,
            task=self.TASK
        )

        df_cached = self.cache_io.load(cache_file)

        if df_cached.empty:
            df_final = pd.DataFrame()
            files_for_task = file_paths
        else:
            current_files_state = [
                {ImageStatsKeys.path: str(path.resolve()), ImageStatsKeys.mtime: path.stat().st_mtime}
                                   for path in file_paths
            ]
            df_disk = pd.DataFrame(current_files_state)
            merged = df_disk.merge(
                df_cached[[ImageStatsKeys.path, ImageStatsKeys.mtime]].drop_duplicates(),
                how="left",
                on=ImageStatsKeys.path,
                suffixes=["", "_old"]
            )

            to_update_mask = (
                    (merged[f"{ImageStatsKeys.mtime}_old"].isna()) |
                    (merged[ImageStatsKeys.mtime] != merged[f"{ImageStatsKeys.mtime}_old"]))
            files_for_task = [Path(p) for p in merged.loc[to_update_mask, ImageStatsKeys.path]]
            df_final = df_cached[df_cached[ImageStatsKeys.path].isin(df_disk[~to_update_mask][ImageStatsKeys.path])]

        if files_for_task:
            self.logger.info(f"Incremental update: processing {len(files_for_task)} files with {self.n_jobs} workers")
            images = {img.stem: str(img.resolve()) for img in self.img_path.iterdir() if
                      img.suffix.lower() in self.extensions}

            worker_func = partial(
                self._analyze_worker,
                reader=self.reader,
                margin_threshold=self.margin_threshold,
                class_mapping=class_mapping)

            with ProcessPoolExecutor(
                    max_workers=self.n_jobs,
                    initializer=self.__class__._init_worker,
                    initargs=(images,)
            ) as executor:
                results = list(executor.map(worker_func, files_for_task))

            new_data = [item for sublist in results for item in sublist]

            if new_data:
                df_new = pd.DataFrame(new_data)
                mtime_map = {str(path): path.stat().st_mtime for path in files_for_task}
                df_new[ImageStatsKeys.mtime] = df_new[ImageStatsKeys.path].map(mtime_map)
                df_final = pd.concat([df_final, df_new], ignore_index=True)
                df_final.reset_index(drop=True, inplace=True)

                numeric_cols = []
                for section in self.settings.img_dataset_report_schema:
                    if section["type"] == "numeric":
                        numeric_cols.extend(section["columns"])


                df_final = OutlierDetector.mark_outliers(df_final, numeric_cols)
                self.logger.info(f"computing UMAP coordinates for the entire dataset with {self.n_jobs} workers")
                features = self.get_umap_features(df_final)
                df_final = self.compute_umap_coords(df=df_final, features=features)
            if files_for_task or (len(df_cached) != len(df_final)):
                self.cache_io.save(df_final, cache_file)
                self.logger.info(f"Cache updated at {cache_file} with {len(df_final)} records")

        return df_final


    def compute_umap_coords(self, df: pd.DataFrame, features: List[str]) -> pd.DataFrame:
        """
        Performs dimensionality reduction to visualize the dataset manifold.

        Uses StandardScaler for normalization and UMAP to project high-dimensional
        features into a 2D space. Results are saved as 'umap_x' and 'umap_y' columns.

        Args:
            df (pd.DataFrame): The feature matrix.
            features (List[str]): Columns to be used for reduction.

        Returns:
            pd.DataFrame: DataFrame with added UMAP coordinates.
        """

        x_data = df[features].fillna(0)
        x_data = x_data.loc[:, x_data.var() > 0]

        if x_data.shape[1] < 2:
            return df

        x_scaled = StandardScaler().fit_transform(x_data)
        n_neighbors = int(np.clip(len(x_data) * 0.1, a_min=15, a_max=50))
        reducer = UMAP(n_neighbors=n_neighbors, min_dist=0.1, n_components=2, n_jobs=self.n_jobs)
        embedding = reducer.fit_transform(x_scaled)

        df['umap_x'] = embedding[:, 0]
        df['umap_y'] = embedding[:, 1]

        return df

    def set_class_mapping(self, file_paths: Tuple[Path]) -> Dict[str, str]:
        """
        Identifies and loads the class name mapping from a definition file.

        Specifically looks for 'classes.txt' in the source directory (YOLO standard).

        Args:
            file_paths (Tuple[Path]): List of files in the source directory.

        Returns:
            Dict[str, str]: A dictionary mapping class IDs to human-readable names.
        """
        classes_file = next((path for path in file_paths if path.name == "classes.txt"), None)
        if classes_file is None:
            self.logger.warning(
                f"No classes file found at {file_paths[0].parent}, class names will be taken from annotations as is"
            )

        classes_mapping = self.reader.read(classes_file)
        self.logger.info(f"Class mapping loaded with {len(classes_mapping)} entries")
        classes_mapping = {value: key for key, value in classes_mapping.items()}
        return classes_mapping

__init__(source_format, log_level=LevelMapping.debug, log_path=None, settings=None, cache_io=None, img_path=None, extensions=None)

Initializes the analytical engine with specific formats and IO tools.

Parameters:

Name Type Description Default
source_format str

Annotation format identifier (e.g., 'yolo', 'voc').

required
log_level str

Minimum logging level. Defaults to 'INFO'.

debug
log_path Optional[Path]

Directory for log files.

None
settings Optional[AppSettings]

Application-wide settings.

None
cache_io Optional[CacheIO]

Component for Parquet-based caching.

None
img_path Optional[Union[Path, str]]

Path to the dataset images.

None
extensions Optional[Tuple[str, ...]]

Valid image file extensions.

None
Source code in tools/stats/base_stats.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def __init__(
        self,
        source_format: str,
        log_level: str = LevelMapping.debug,
        log_path: Optional[Path] = None,
        settings: Optional[AppSettings] = None,
        cache_io: Optional[CacheIO] = None,
        img_path: Optional[Union[Path, str]] = None,
        extensions: Optional[Tuple[str, ...]] = None,
):
    """
    Initializes the analytical engine with specific formats and IO tools.

    Args:
        source_format (str): Annotation format identifier (e.g., 'yolo', 'voc').
        log_level (str): Minimum logging level. Defaults to 'INFO'.
        log_path (Optional[Path]): Directory for log files.
        settings (Optional[AppSettings]): Application-wide settings.
        cache_io (Optional[CacheIO]): Component for Parquet-based caching.
        img_path (Optional[Union[Path, str]]): Path to the dataset images.
        extensions (Optional[Tuple[str, ...]]): Valid image file extensions.
    """

    self.reader_mapping = {
        ".xml": XMLReader,
        ".txt": TXTReader
    }

    self.suffix_mapping = {
        "voc": ".xml",
        "yolo": ".txt"
    }

    self.settings = settings
    self.extensions = extensions
    self.img_path = img_path
    self.margin_threshold: int = self.settings.margin_threshold
    self.reader = self.reader_mapping.get(source_format, None)
    self.source_suffix = self.suffix_mapping.get(source_format)
    self.reader = self.reader_mapping[self.source_suffix]()
    self.cache_io = cache_io or CacheIO(self.settings)
    self.n_jobs = self.settings.n_jobs
    self.logger = LoggerConfigurator.setup(
        name=self.__class__.__name__,
        log_level=log_level,
        log_path=Path(log_path) / f"{self.__class__.__name__}.log" if log_path else None
    )

compute_umap_coords(df, features)

Performs dimensionality reduction to visualize the dataset manifold.

Uses StandardScaler for normalization and UMAP to project high-dimensional features into a 2D space. Results are saved as 'umap_x' and 'umap_y' columns.

Parameters:

Name Type Description Default
df DataFrame

The feature matrix.

required
features List[str]

Columns to be used for reduction.

required

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame with added UMAP coordinates.

Source code in tools/stats/base_stats.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
def compute_umap_coords(self, df: pd.DataFrame, features: List[str]) -> pd.DataFrame:
    """
    Performs dimensionality reduction to visualize the dataset manifold.

    Uses StandardScaler for normalization and UMAP to project high-dimensional
    features into a 2D space. Results are saved as 'umap_x' and 'umap_y' columns.

    Args:
        df (pd.DataFrame): The feature matrix.
        features (List[str]): Columns to be used for reduction.

    Returns:
        pd.DataFrame: DataFrame with added UMAP coordinates.
    """

    x_data = df[features].fillna(0)
    x_data = x_data.loc[:, x_data.var() > 0]

    if x_data.shape[1] < 2:
        return df

    x_scaled = StandardScaler().fit_transform(x_data)
    n_neighbors = int(np.clip(len(x_data) * 0.1, a_min=15, a_max=50))
    reducer = UMAP(n_neighbors=n_neighbors, min_dist=0.1, n_components=2, n_jobs=self.n_jobs)
    embedding = reducer.fit_transform(x_scaled)

    df['umap_x'] = embedding[:, 0]
    df['umap_y'] = embedding[:, 1]

    return df

get_features(file_paths, class_mapping=None)

Orchestrates feature extraction using incremental caching and parallel processing.

This method checks the modification time (mtime) of each file. It only processes new or changed files, significantly reducing execution time for large datasets.

Parameters:

Name Type Description Default
file_paths Tuple[Path, ...]

List of annotation files to process.

required
class_mapping Optional[Dict[str, str]]

Class ID to name mapping.

None

Returns:

Type Description
DataFrame

pd.DataFrame: A complete feature matrix including UMAP coordinates and outlier flags.

Source code in tools/stats/base_stats.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def get_features(
        self,
        file_paths: Tuple[Path, ...],
        class_mapping: Optional[Dict[str, str]] = None
) -> pd.DataFrame:
    """
    Orchestrates feature extraction using incremental caching and parallel processing.

    This method checks the modification time (mtime) of each file. It only
    processes new or changed files, significantly reducing execution time
    for large datasets.

    Args:
        file_paths (Tuple[Path, ...]): List of annotation files to process.
        class_mapping (Optional[Dict[str, str]]): Class ID to name mapping.

    Returns:
        pd.DataFrame: A complete feature matrix including UMAP coordinates
            and outlier flags.
    """
    if not file_paths:
        return pd.DataFrame()

    cache_file = self.settings.cache_file_path / self.cache_io.generate_cache_filename(
        source_path=file_paths[0].parent,
        cache_name=self.settings.cache_name,
        format=self.source_suffix,
        task=self.TASK
    )

    df_cached = self.cache_io.load(cache_file)

    if df_cached.empty:
        df_final = pd.DataFrame()
        files_for_task = file_paths
    else:
        current_files_state = [
            {ImageStatsKeys.path: str(path.resolve()), ImageStatsKeys.mtime: path.stat().st_mtime}
                               for path in file_paths
        ]
        df_disk = pd.DataFrame(current_files_state)
        merged = df_disk.merge(
            df_cached[[ImageStatsKeys.path, ImageStatsKeys.mtime]].drop_duplicates(),
            how="left",
            on=ImageStatsKeys.path,
            suffixes=["", "_old"]
        )

        to_update_mask = (
                (merged[f"{ImageStatsKeys.mtime}_old"].isna()) |
                (merged[ImageStatsKeys.mtime] != merged[f"{ImageStatsKeys.mtime}_old"]))
        files_for_task = [Path(p) for p in merged.loc[to_update_mask, ImageStatsKeys.path]]
        df_final = df_cached[df_cached[ImageStatsKeys.path].isin(df_disk[~to_update_mask][ImageStatsKeys.path])]

    if files_for_task:
        self.logger.info(f"Incremental update: processing {len(files_for_task)} files with {self.n_jobs} workers")
        images = {img.stem: str(img.resolve()) for img in self.img_path.iterdir() if
                  img.suffix.lower() in self.extensions}

        worker_func = partial(
            self._analyze_worker,
            reader=self.reader,
            margin_threshold=self.margin_threshold,
            class_mapping=class_mapping)

        with ProcessPoolExecutor(
                max_workers=self.n_jobs,
                initializer=self.__class__._init_worker,
                initargs=(images,)
        ) as executor:
            results = list(executor.map(worker_func, files_for_task))

        new_data = [item for sublist in results for item in sublist]

        if new_data:
            df_new = pd.DataFrame(new_data)
            mtime_map = {str(path): path.stat().st_mtime for path in files_for_task}
            df_new[ImageStatsKeys.mtime] = df_new[ImageStatsKeys.path].map(mtime_map)
            df_final = pd.concat([df_final, df_new], ignore_index=True)
            df_final.reset_index(drop=True, inplace=True)

            numeric_cols = []
            for section in self.settings.img_dataset_report_schema:
                if section["type"] == "numeric":
                    numeric_cols.extend(section["columns"])


            df_final = OutlierDetector.mark_outliers(df_final, numeric_cols)
            self.logger.info(f"computing UMAP coordinates for the entire dataset with {self.n_jobs} workers")
            features = self.get_umap_features(df_final)
            df_final = self.compute_umap_coords(df=df_final, features=features)
        if files_for_task or (len(df_cached) != len(df_final)):
            self.cache_io.save(df_final, cache_file)
            self.logger.info(f"Cache updated at {cache_file} with {len(df_final)} records")

    return df_final

get_umap_features(df) abstractmethod staticmethod

Defines the list of numeric features to be used for UMAP projection.

Parameters:

Name Type Description Default
df DataFrame

The extracted feature matrix.

required

Returns:

Type Description
List[str]

List[str]: List of column names for dimensionality reduction.

Source code in tools/stats/base_stats.py
116
117
118
119
120
121
122
123
124
125
126
127
128
@staticmethod
@abstractmethod
def get_umap_features(df: pd.DataFrame) -> List[str]:
    """
    Defines the list of numeric features to be used for UMAP projection.

    Args:
        df (pd.DataFrame): The extracted feature matrix.

    Returns:
        List[str]: List of column names for dimensionality reduction.
    """
    pass

set_class_mapping(file_paths)

Identifies and loads the class name mapping from a definition file.

Specifically looks for 'classes.txt' in the source directory (YOLO standard).

Parameters:

Name Type Description Default
file_paths Tuple[Path]

List of files in the source directory.

required

Returns:

Type Description
Dict[str, str]

Dict[str, str]: A dictionary mapping class IDs to human-readable names.

Source code in tools/stats/base_stats.py
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def set_class_mapping(self, file_paths: Tuple[Path]) -> Dict[str, str]:
    """
    Identifies and loads the class name mapping from a definition file.

    Specifically looks for 'classes.txt' in the source directory (YOLO standard).

    Args:
        file_paths (Tuple[Path]): List of files in the source directory.

    Returns:
        Dict[str, str]: A dictionary mapping class IDs to human-readable names.
    """
    classes_file = next((path for path in file_paths if path.name == "classes.txt"), None)
    if classes_file is None:
        self.logger.warning(
            f"No classes file found at {file_paths[0].parent}, class names will be taken from annotations as is"
        )

    classes_mapping = self.reader.read(classes_file)
    self.logger.info(f"Class mapping loaded with {len(classes_mapping)} entries")
    classes_mapping = {value: key for key, value in classes_mapping.items()}
    return classes_mapping