1"""Setup functionality for SkySim. Includes both methods for parsing a
2configuration TOML, and converting that data in `Settings` (and friends)
3objects.
4"""
5
6# License: GPLv3+ (see COPYING); Copyright (C) 2025 Tai Withers
7
8import tomllib
9from collections.abc import Mapping
10from datetime import date, datetime, time, timedelta
11from functools import cached_property
12from pathlib import Path
13from typing import Any, ForwardRef, Self # pylint: disable=unused-import
14from zoneinfo import ZoneInfo
15
16import numpy as np
17from astropy import units as u
18from astropy.coordinates import ICRS, AltAz, Angle, EarthLocation, SkyCoord
19from astropy.coordinates.name_resolve import NameResolveError
20from astropy.time import Time
21from astropy.wcs import WCS
22from matplotlib.colors import LinearSegmentedColormap
23from pydantic import (
24 BaseModel,
25 ConfigDict,
26 Field,
27 NonNegativeFloat,
28 PositiveFloat,
29 PositiveInt,
30 ValidationError,
31 ValidationInfo,
32 computed_field,
33 field_validator,
34 model_validator,
35)
36from timezonefinder import TimezoneFinder
37
38from skysim.colours import InputColour, RGBTuple, convert_colour
39from skysim.utils import FloatArray, IntArray
40
41# Type Aliases
42
43
44type ConfigValue = str | date | time | int | float | dict[str, InputColour] | dict[
45 int | float, int
46] | list[int | float]
47type ConfigMapping = Mapping[str, ConfigValue]
48type TOMLConfig = dict[ # pylint: disable=invalid-name
49 str, dict[str, ConfigValue | dict[str, ConfigValue]]
50]
51type SettingsPair = tuple["ImageSettings", "PlotSettings"]
52
53
54# Constants
55
56
57DEFAULT_CONFIG_PATH = Path(__file__).parent / "default.toml"
58
59DATACLASS_CONFIG = ConfigDict(
60 arbitrary_types_allowed=True,
61 extra="forbid",
62 frozen=True,
63)
64
65AIRY_DISK_RADIUS = 23 * u.arcmin / 2
66"""How far light from an object should spread. Based on the SIMBAD image of Vega."""
67
68MAXIMUM_LIGHT_SPREAD = 10
69"""Calculate the spread of light from an object out to this many standard deviations."""
70
71
72# Classes
73
74
[docs]
75class Settings(BaseModel): # type: ignore[misc]
76 """Base class to interpret often-used configuration values. The `Settings`
77 class should never be used or passed directly, but instead should be created
78 only for the purpose of then calling `.get_image_settings()` and
79 `.get_plot_settings()`.
80 """
81
82 model_config = DATACLASS_CONFIG
83
84 # Stored on initialization
85 input_location: str | list[float]
86 """User-input version of the observing location."""
87
88 field_of_view: u.Quantity["angle"] # type: ignore[type-arg, name-defined]
89 """Diameter of the area being observed at any time."""
90
91 altitude_angle: u.Quantity["angle"] # type: ignore[type-arg, name-defined]
92 """Angle of observation (measured from horizon)."""
93
94 azimuth_angle: u.Quantity["angle"] # type: ignore[type-arg, name-defined]
95 """Angle of observation (Eastwards from North)."""
96 image_pixels: PositiveInt
97 """Number of pixels (diameter) for the resulting image."""
98
99 # Used to instantiate others, and stored, but hidden
100 start_date: date = Field(repr=False)
101 """Starting (local) date of observation."""
102
103 start_time: time = Field(repr=False)
104 """Starting (local) time of observation."""
105
106 snapshot_frequency: timedelta = Field(repr=False)
107 """How often an observation should be taken - should be given in concert
108 with `duration` """
109
110 duration: timedelta = Field(repr=False)
111 """How long the total observation should last - should be given in concert
112 with `snapshot_frequency`"""
113
[docs]
114 @field_validator("field_of_view", "altitude_angle", "azimuth_angle", mode="after")
115 @classmethod
116 def convert_to_deg(
117 cls, angular: u.Quantity["angle"] # type: ignore[type-arg, name-defined]
118 ) -> u.Quantity["degree"]: # type: ignore[type-arg, name-defined]
119 """Convert angular quantities to degrees.
120
121 Parameters
122 ----------
123 angular : astropy.units.Quantity[angle]
124 Astropy angular quantity.
125
126 Returns
127 -------
128 astropy.units.Quantity[degree]
129 Input angle in degrees.
130 """
131 return angular.to(u.deg)
132
[docs]
133 @model_validator(mode="after")
134 def compare_timespans(self) -> Self:
135 """
136 Confirm that the time between snapshots is not greater than the observation
137 duration.
138
139 Returns
140 -------
141 Self
142 `Settings` object.
143
144 Raises
145 ------
146 ValueError
147 Raised if the snapshot frequency is greater than the duration.
148 """
149 if self.snapshot_frequency > self.duration:
150 raise ValueError(
151 "Frequency of snapshots (observation.interval) cannot be longer than "
152 "observation.duration."
153 )
154 return self
155
156 # Derived and stored
157 @computed_field()
158 @cached_property
159 def frames(self) -> PositiveInt:
160 """
161 Calculates number of frames for movie/observations to take.
162
163 Returns
164 -------
165 pydantic.PositiveInt
166 Number of frames.
167 """
168 if self.snapshot_frequency.total_seconds() > 0:
169 return int(self.duration / self.snapshot_frequency)
170 return 1
171
172 @computed_field()
173 @cached_property
174 def earth_location(self) -> EarthLocation:
175 """
176 Looks up where on Earth the user requested the observation be taken from.
177
178 Returns
179 -------
180 astropy.coordinates.EarthLocation
181 Astropy representation of location on Earth.
182
183 Raises
184 ------
185 NotImplementedError
186 Raised if location lookup fails.
187 """
188 try:
189 return EarthLocation.of_address(self.input_location)
190 except NameResolveError as e:
191 if "connection" in e.args[0]:
192 raise ConnectionError(e.args[0].replace("address", "location")) from e
193 raise ValueError(e.args[0].replace("address", "location")) from e
194
195 @computed_field()
196 @cached_property
197 def timezone(self) -> ZoneInfo:
198 """
199 Look up timezone based on Lat/Long.
200
201 Returns
202 -------
203 zoneinfo.ZoneInfo
204 Timezone information.
205
206 Raises
207 ------
208 NotImplementedError
209 Raised in the case that the lookup fails.
210 """
211 lat, lon = [
212 l.to(u.deg).value
213 for l in [self.earth_location.lat, self.earth_location.lon]
214 ]
215 tf = TimezoneFinder()
216 tzname = tf.timezone_at(lat=lat, lng=lon)
217 if tzname is None:
218 raise ValueError( # TODO: add test for this error
219 f"Cannot determine timezone for {lat}, {lon} ({self.input_location})"
220 )
221
222 return ZoneInfo(tzname)
223
224 @computed_field()
225 @cached_property
226 def observation_times(self) -> Time:
227 """
228 Calculates the times at which to take a snapshot.
229
230 Returns
231 -------
232 astropy.time.Time
233 Astropy representation of one or more times.
234 """
235 start_datetime = datetime(
236 year=self.start_date.year,
237 month=self.start_date.month,
238 day=self.start_date.day,
239 hour=self.start_time.hour,
240 minute=self.start_time.minute,
241 second=self.start_time.second,
242 microsecond=self.start_time.microsecond,
243 tzinfo=self.timezone,
244 )
245 return Time(
246 [(start_datetime + self.snapshot_frequency * i) for i in range(self.frames)]
247 )
248
249 @computed_field()
250 @cached_property
251 def observation_radec(self) -> SkyCoord:
252 """
253 Calculates the observed RA/Dec position for each observation snapshot.
254
255 Returns
256 -------
257 astropy.coordinates.SkyCoord
258 Astropy representation of one or more coordinates.
259 """
260 earth_frame = AltAz(
261 obstime=self.observation_times,
262 az=self.azimuth_angle,
263 alt=self.altitude_angle,
264 location=self.earth_location,
265 )
266 return SkyCoord(earth_frame.transform_to(ICRS()))
267
268 @computed_field()
269 @cached_property
270 def degrees_per_pixel(self) -> u.Quantity["angle"]: # type: ignore[type-arg, name-defined]
271 """
272 Calculates the number of degrees spanned by each pixel in the resulting image.
273
274 Returns
275 -------
276 astropy.units.Quantity[angle]
277 Degrees per pixel (pixel considered unitless).
278 """
279 return (self.field_of_view / self.image_pixels).to(u.deg)
280
281 @computed_field()
282 @cached_property
283 def local_datetimes(self) -> list[datetime]:
284 """Observation snapshot times as timezone-aware python times.
285
286 Returns
287 -------
288 list[datetime.time]
289 List of observation times.
290 """
291 utc = [
292 t.to_datetime().replace(tzinfo=ZoneInfo("UTC"))
293 for t in self.observation_times
294 ]
295 return [u.astimezone(tz=self.timezone) for u in utc]
296
297 @computed_field()
298 @cached_property
299 def wcs_objects(self) -> list[WCS]:
300 """WCS objects for each timestep.
301
302 Returns
303 -------
304 list[astropy.wcs.WCS]
305 WCS objects for each timestep.
306 """
307 wcs_by_frame = []
308 for radec in self.observation_radec:
309 wcs = WCS(naxis=2)
310 wcs.wcs.crpix = [self.image_pixels / 2] * 2
311 wcs.wcs.cdelt = [self.degrees_per_pixel.value, self.degrees_per_pixel.value]
312 wcs.wcs.crval = [radec.ra.value, radec.dec.value]
313 wcs.wcs.ctype = ["RA", "DEC"]
314 wcs.wcs.cunit = [u.deg, u.deg]
315 wcs_by_frame.append(wcs)
316 return wcs_by_frame
317
318 def __str__(self: "Settings") -> str:
319 result = ""
320 for k, v in self.model_dump().items():
321 v = str(v).replace("\n", "\n\t")
322 result += f"{k}: {v}\n"
323 return result[:-1]
324
[docs]
325 def get_image_settings(self: "Settings", **kwargs: Any) -> "ImageSettings":
326 """
327 Generate an `ImageSettings` object inheriting this object's information.
328
329 Parameters
330 ----------
331 **kwargs
332 Dictionary of arguments to be passed to `ImageSettings`.
333
334 Returns
335 -------
336 ImageSettings
337 Object containing all passed configuration values as well as those from the
338 instantiation of this `Settings` object.
339 """
340 return ImageSettings(**vars(self), **kwargs)
341
[docs]
342 def get_plot_settings(self: "Settings", **kwargs: Any) -> "PlotSettings":
343 """
344 Generate an `PlotSettings` object inheriting this object's information.
345
346 Parameters
347 ----------
348 **kwargs
349 Dictionary of arguments to be passed to `PlotSettings`.
350
351 Returns
352 -------
353 PlotSettings
354 Object containing all passed configuration values as well as those from the
355 instantiation of this `Settings` object.
356 """
357 return PlotSettings(**vars(self), **kwargs)
358
359
[docs]
360class ImageSettings(Settings): # type: ignore[misc]
361 """`Settings` subclass to hold values used when populating the image array.
362 Additionally contains a copy of all attributes belonging to the `Settings` instance
363 it inherits from.
364 """
365
366 model_config = DATACLASS_CONFIG
367
368 # Stored on initialization
369 object_colours: dict[str, RGBTuple] = Field()
370 """Mapping between object types, and the colours to use on the image."""
371
372 # Used to instantiate others
373 colour_values: list[RGBTuple] = Field(repr=False)
374 """List of colours to use to fill out the background."""
375
376 colour_time_indices: dict[float | int, int] = Field(repr=False)
377 """Mapping between hour of the day (0-24, float) and the index corresponding
378 to the colour in `colour_values` to use at that time."""
379
380 magnitude_values: list[float | int] = Field(repr=False)
381 """List of maximum magnitude values, to be used in the same manner as
382 `colour_values`."""
383
384 magnitude_time_indices: dict[float | int, int] = Field(repr=False)
385 """Same as `colour_time_indices`, except applying to magnitude_values. Need not be
386 the same as `colour_time_indices`."""
387
388 @field_validator("object_colours", mode="before")
389 @classmethod
390 def _convert_colour_dict( # type: ignore[misc]
391 cls, colour_dict: dict[str, Any]
392 ) -> dict[str, RGBTuple]:
393 return {key: convert_colour(value) for key, value in colour_dict.items()}
394
395 @field_validator("colour_values", mode="before")
396 @classmethod
397 def _convert_colour_list( # type: ignore[misc]
398 cls, colour_list: list[Any]
399 ) -> list[RGBTuple]:
400 return [convert_colour(value) for value in colour_list]
401
402 # Derived and stored
403 @computed_field()
404 @cached_property
405 def maximum_magnitude(self) -> float:
406 """The highest magnitude that will ever be visible in the image.
407
408 Returns
409 -------
410 float
411 Magnitude.
412 """
413 return max(self.magnitude_values)
414
415 @computed_field()
416 @cached_property
417 def colour_mapping(self) -> LinearSegmentedColormap:
418 """Interpolate between the colour-time mappings indicated by `colour_values` and
419 `colour_time_indices` to generate an addressable mapping.
420
421 Returns
422 -------
423 matplotlib.colors.LinearSegmentedColormap
424 Callable object on the interval [0,1] returning a RGBTuple.
425 """
426 colour_by_time = [
427 (hour / 24, self.colour_values[index])
428 for hour, index in self.colour_time_indices.items()
429 ]
430 return LinearSegmentedColormap.from_list("sky", colour_by_time)
431
432 @computed_field()
433 @cached_property
434 def magnitude_mapping(self) -> FloatArray:
435 """Interpolate between the magnitude-time mappings indicated by
436 `magnitude_values` and `magnitude_time_indices` to generate an addressable
437 mapping.
438
439 Returns
440 -------
441 FloatArray
442 Array containing the calculated magnitude value for each second of the day.
443 """
444 magnitude_day_percentage = [
445 hour / 24 for hour in self.magnitude_time_indices.keys()
446 ]
447 magnitude_by_time = [
448 self.magnitude_values[index]
449 for index in self.magnitude_time_indices.values()
450 ]
451 day_percentages = np.linspace(0, 1, 24 * 60 * 60)
452 return np.interp(day_percentages, magnitude_day_percentage, magnitude_by_time)
453
454 @computed_field()
455 @cached_property
456 def light_spread_stddev(self) -> PositiveFloat:
457 """Standard deviation for the Gaussian which defines the spread of starlight.
458
459 Returns
460 -------
461 pydantic.PositiveFloat
462 Standard deviation.
463 """
464 airy_disk_pixels = AIRY_DISK_RADIUS.to(u.deg) / self.degrees_per_pixel
465 airy_disk_pixels = airy_disk_pixels.decompose()
466
467 # assume the airy disk is at 3x standard deviation of the Gaussian
468 std_dev = airy_disk_pixels / 3
469
470 # standard deviation is "diameter" whilst airy is "radius"
471 std_dev *= 2
472
473 return float(std_dev)
474
475 @computed_field()
476 @cached_property
477 def area_mesh(self) -> IntArray:
478 """Create a mesh of indices that spread out from a central point.
479
480 Returns
481 -------
482 IntArray
483 (2, X, X) array.
484 """
485 maximum_radius = np.ceil(
486 MAXIMUM_LIGHT_SPREAD * self.light_spread_stddev
487 ).astype(int)
488
489 radius_vector = np.arange(-maximum_radius, maximum_radius + 1)
490
491 # mesh of points which will map to the region around the star
492 return np.array(np.meshgrid(radius_vector, radius_vector))
493
494 @computed_field()
495 @cached_property
496 def brightness_scale_mesh(self) -> FloatArray:
497 """Create a mesh of scaling factors that will dictate how a star's light
498 falls off with distance.
499
500 Returns
501 -------
502 FloatArray
503 2D mesh of [0,1] values.
504 """
505
506 # radius measurement at each mesh point
507 radial_distance = np.sqrt(self.area_mesh[0] ** 2 + self.area_mesh[1] ** 2)
508
509 unique_radii = np.unique(radial_distance)
510
511 # all of the locations where each unique radius is found
512 radius_locations = []
513 for r in unique_radii:
514 locations = np.array(np.where(radial_distance == r)).T
515 radius_locations.append(locations)
516
517 mesh = np.zeros_like(self.area_mesh[0]) # instantiate brightness_scale_mesh
518
519 for r, p in zip(unique_radii, radius_locations):
520 brightness_scale = self.brightness_gaussian(r)
521 # TODO: remove this loop by using a smarter numpy-based way to store
522 # radius_locations entries
523 for x, y in p:
524 mesh[y, x] = brightness_scale
525
526 return mesh
527
[docs]
528 def brightness_gaussian(self, radius: NonNegativeFloat) -> NonNegativeFloat:
529 """Calculate how much light is observed from a star at some radius away
530 from it.
531
532 Parameters
533 ----------
534 radius : pydantic.NonNegativeFloat
535 Distance in pixels.
536
537 Returns
538 -------
539 pydantic.NonNegativeFloat
540 Scaling factor for brightness.
541 """
542 return np.exp(-(radius**2) / (self.light_spread_stddev**2))
543
544
[docs]
545class PlotSettings(Settings): # type: ignore[misc]
546 """`Settings` subclass to hold values used when generating the final plot
547 Additionally contains a copy of all attributes belonging to the `Settings` instance
548 it inherits from.
549 """
550
551 model_config = DATACLASS_CONFIG
552
553 # Stored on initialization
554 fps: NonNegativeFloat
555 """Frames per second of the GIF/video generated. Zero if there is only one
556 frame."""
557
558 filename: Path
559 """Location to save the plot."""
560
561 figure_size: tuple[PositiveFloat, PositiveFloat]
562 """Size of the figure in inches to pass to `matplotlib`."""
563
564 dpi: PositiveInt
565 """Dots per inch, passed to `matplotlib`."""
566
567 @computed_field()
568 @cached_property
569 def observation_info(self) -> str:
570 """Generate a string containing information about the observation for the plot.
571 Only contains information which is constant throughout all snapshots.
572
573 Returns
574 -------
575 str
576 Formatted string.
577 """
578 altitude = angle_to_dms(self.altitude_angle)
579 azimuth = angle_to_dms(self.azimuth_angle)
580 fov = angle_to_dms(self.field_of_view)
581 return (
582 f"{self.input_location}\n Altitude: {altitude}, "
583 f"Azimuth: {azimuth}, FOV: {fov}"
584 )
585
586 @computed_field
587 @cached_property
588 def datetime_strings(self) -> list[str]:
589 """Printable observation times.
590
591 Returns
592 -------
593 list[str]
594 List of strings for each time.
595 """
596 if (self.start_time.second == 0) and divmod(
597 self.snapshot_frequency.total_seconds(), 60
598 )[1] == 0:
599 fmt_string = "%Y-%m-%d %H:%M %Z"
600 else:
601 fmt_string = "%Y-%m-%d %X %Z"
602
603 return [i.strftime(fmt_string) for i in self.local_datetimes]
604
605 @computed_field
606 @cached_property
607 def tempfile_path(self) -> Path:
608 """Path to store video frame files.
609
610 Returns
611 -------
612 pathlib.Path
613 Directory.
614 """
615 return self.filename.parent / "SkySimFiles"
616
617 @computed_field
618 @cached_property
619 def tempfile_zfill(self) -> int:
620 """How much zero-padding to use when writing the video frame filenames.
621
622 Returns
623 -------
624 int
625 Number of digits.
626 """
627 return np.ceil(np.log10(self.frames)).astype(int)
628
[docs]
629 @field_validator("filename", mode="after")
630 @classmethod
631 def check_parent_directory_exists(cls, filename: Path) -> Path:
632 """Raise an error if the directory in which to save the image doesn't exist.
633
634 Parameters
635 ----------
636 filename : pathlib.Path
637 Location to save image.
638
639 Returns
640 -------
641 pathlib.Path :
642 Same as input.
643
644 Raises
645 ------
646 ValueError
647 Raised if parent directory does not exist.
648 """
649
650 if not filename.parent.exists():
651 raise ValueError(
652 f"Cannot save result '{filename.resolve()}' because parent directory "
653 f"'{filename.parent.resolve()}' does not exist."
654 )
655 return filename.resolve()
656
[docs]
657 @field_validator("fps", mode="after")
658 @classmethod
659 def validate_fps(
660 cls, input_fps: NonNegativeFloat, info: ValidationInfo
661 ) -> NonNegativeFloat:
662 """Ensure fps conforms to the described requirements.
663
664 Parameters
665 ----------
666 input_fps : pydantic.NonNegativeFloat
667 Frames per second as input when instantiating the `PlotSettings` object.
668 info : pydantic.ValidationInfo
669 Pydantic `ValidationInfo` object allowing access to the other
670 already-validated fields.
671
672 Returns
673 -------
674 pydantic.NonNegativeFloat
675 Validated fps.
676
677 Raises
678 ------
679 ValueError
680 Raised if `fps` is given as zero but the observation `duration` implies
681 there should be multiple frames.
682 """
683 if info.data["duration"].total_seconds() == 0:
684 return 0
685 if input_fps == 0:
686 raise ValueError(
687 f"Non-zero duration ({info.data['duration']}) implies the "
688 "creation of a movie, but the given fps was zero."
689 )
690 return input_fps
691
692
693# Methods
694
695
696## Top-Level Settings Methods
697
698
[docs]
699def load_from_toml(
700 filename: Path, return_settings: bool = False
701) -> Settings | SettingsPair:
702 """Load configuration options from a TOML file and parse them into `Settings`
703 objects.
704
705 Parameters
706 ----------
707 filename : str
708 Location of the configuration file.
709 return_settings : bool, optional
710 Whether to return the `Settings` object (`True`) or `ImageSettings` and
711 `PlotSettings` objects (`False`). Default `False`.
712
713 Returns
714 -------
715 tuple[ImageSettings, PlotSettings]
716 `Settings` objects generated from the configuration.
717 """
718
719 settings_config, image_config, plot_config = toml_to_dicts(filename)
720
721 try:
722 settings = Settings(**settings_config)
723
724 if return_settings:
725 return settings
726
727 image_settings = settings.get_image_settings(**image_config)
728 plot_settings = settings.get_plot_settings(**plot_config)
729
730 return image_settings, plot_settings
731
732 except ValidationError as e:
733 # pydantic does a lot of wrapping around their errors...
734 original_error = e.errors()[0]
735 error_message = original_error["msg"] # "Value error, [original message]"
736
737 if original_error["type"] == "value_error": # explicitly raised in a validator
738 skip_point = "error, "
739 index = error_message.index(skip_point) + len(skip_point)
740 new_message = error_message[index:]
741
742 else: # probaby a type coercion error listed in
743 # https://docs.pydantic.dev/2.10/errors/validation_errors/
744 bad_dict_key = original_error["loc"][0]
745 bad_dict_value = original_error["input"]
746 new_message = (
747 f"Error processing '{bad_dict_value}' into {bad_dict_key}. "
748 f"{error_message}."
749 )
750
751 raise ValueError(new_message) from e
752
753
754## Helper Methods
755
756
[docs]
757def toml_to_dicts(
758 filename: Path,
759) -> tuple[dict[str, Any], dict[str, Any], dict[str, Any]]:
760 """Read the configuration file and combine it with the default configuration
761 to get dictionaries of values which can be used for the various `Settings` classes.
762
763 Parameters
764 ----------
765 filename : pathlib.Path
766 Path the the toml config file.
767
768 Returns
769 -------
770 tuple[ConfigMapping]
771 Dictionary for `Settings`, `ImageSettings`, and `PlotSettings`.
772 """
773
774 # open the config file
775 with filename.open(mode="rb") as opened:
776 try:
777 toml_config = tomllib.load(opened)
778 except tomllib.TOMLDecodeError as e:
779 raise ValueError(f"Error reading config file. {e.args[0]}.") from e
780
781 check_mandatory_toml_keys(toml_config)
782
783 with DEFAULT_CONFIG_PATH.open(mode="rb") as default:
784 default_config = tomllib.load(default)
785
786 load_or_default = lambda toml_key, default_key=None: get_config_option(
787 toml_config, toml_key, default_config, default_key
788 )
789
790 settings_config = {
791 # Mandatory
792 "input_location": toml_config["observation"]["location"],
793 "field_of_view": parse_angle_dict(toml_config["observation"]["viewing-radius"]),
794 "altitude_angle": parse_angle_dict(toml_config["observation"]["altitude"]),
795 "azimuth_angle": parse_angle_dict(toml_config["observation"]["azimuth"]),
796 "start_date": toml_config["observation"]["date"],
797 "start_time": toml_config["observation"]["time"],
798 # Optional
799 "image_pixels": load_or_default("image.pixels"),
800 "duration": time_to_timedelta(
801 load_or_default("observation.duration") # type: ignore[arg-type]
802 ),
803 "snapshot_frequency": time_to_timedelta(
804 load_or_default("observation.interval") # type: ignore[arg-type]
805 ),
806 }
807
808 image_config = {
809 k: load_or_default(f"image.{v}", default_key=f"image.{v}")
810 for k, v in {
811 # Optional
812 "object_colours": "object-colours",
813 "colour_values": "sky-colours",
814 "colour_time_indices": "sky-colours-index-by-time",
815 "magnitude_values": "maximum-magnitudes",
816 "magnitude_time_indices": "maximum-magnitudes-index-by-time",
817 }.items()
818 }
819
820 plot_config = {
821 # Mandatory
822 "filename": toml_config["image"]["filename"],
823 # Optional
824 "fps": load_or_default("image.fps"),
825 "dpi": load_or_default("image.dpi"),
826 "figure_size": (
827 load_or_default("image.width"),
828 load_or_default("image.height"),
829 ),
830 }
831
832 return settings_config, image_config, plot_config
833
834
[docs]
835def split_nested_key(full_key: str) -> list[str]:
836 """Convert a string of the form 'a.b.c' into a list of the form ['a','b','c'].
837
838 Parameters
839 ----------
840 full_key : str
841 String of the form 'a.b.c'.
842
843 Returns
844 -------
845 list[str]
846 List generated from string.
847 """
848 return full_key.split(".")
849
850
[docs]
851def access_nested_dictionary(
852 dictionary: dict[str, Any], keys: list[str]
853) -> ConfigValue:
854 """Access a value from an arbitrarily nested dictionary via a list of keys.
855
856 Parameters
857 ----------
858 dictionary : dict[str, typing.Any]
859 The top-level dictionary.
860 keys : list[str]
861 List of dictionary keys.
862
863 Returns
864 -------
865 ConfigValue
866 The accessed value.
867 """
868 subdictionary = dictionary.copy()
869 for key in keys[:-1]:
870 subdictionary = subdictionary[key]
871 return subdictionary[keys[-1]]
872
873
[docs]
874def check_key_exists(dictionary: TOMLConfig, full_key: str) -> bool:
875 """Check if a key exists within a nested dictionary.
876
877 Parameters
878 ----------
879 dictionary : TOMLConfig
880 The top-level dictionary to check.
881 full_key : str
882 Potentially nested key.
883
884 Returns
885 -------
886 bool
887 Whether or not the key exists.
888 """
889 try:
890 value = access_nested_dictionary(dictionary, split_nested_key(full_key))
891 assert len(str(value)) > 0
892 return True
893 except (KeyError, AssertionError):
894 return False
895
896
[docs]
897def check_mandatory_toml_keys(dictionary: TOMLConfig) -> None:
898 """Validate the existence of the required keys in the TOML configuration.
899
900 Parameters
901 ----------
902 dictionary : TOMLConfig
903 Loaded user configuration.
904
905 Raises
906 ------
907 ValueError
908 Raised if:
909 - mandatory keys aren't present
910 - keys that require at least one of some group aren't present
911 - keys that are required in sets are not property provided
912 """
913 mandatory_keys = [f"observation.{key}" for key in ("location", "date", "time")] + [
914 "image.filename"
915 ]
916
917 # Keys where at least one should be present (i.e., user can have any combination of
918 # observation.altitude.degrees, observation.altitude.arcminutes, and
919 # observation.altitude.arcseconds, but at least one of them must be included)
920 one_or_more_keys = [
921 [
922 f"observation.{key}.{unit}"
923 for unit in ("degrees", "arcminutes", "arcseconds")
924 ]
925 for key in ("viewing-radius", "altitude", "azimuth")
926 ]
927
928 # keys which should either be excluded entirely, or present only in complete sets
929 all_or_none_keys = [
930 [f"observation.{key}" for key in ("interval", "duration")],
931 [f"image.{key}" for key in ("width", "height")],
932 ]
933 # check that mandatory keys are provided
934 for key in mandatory_keys:
935 if not check_key_exists(dictionary, key):
936 raise ValueError(f"Required element {key} was not found.")
937
938 # check that one-or-more keys are provided
939 for keyset in one_or_more_keys:
940 keys_exist = [check_key_exists(dictionary, key) for key in keyset]
941 if not any(keys_exist):
942 raise ValueError(
943 f"One or more of {keyset} must be given, but none were found."
944 )
945
946 # all_or_none keys
947 for keyset in all_or_none_keys:
948 keys_exist = [check_key_exists(dictionary, key) for key in keyset]
949 if (not all(keys_exist)) and any(keys_exist):
950 raise ValueError(
951 f"Some but not all of the keys {keyset} were given. "
952 "These keys must be given all together or not at all."
953 )
954
955 return
956
957
[docs]
958def parse_angle_dict(
959 dictionary: dict[str, int | float],
960) -> u.Quantity["angle"]: # type: ignore[type-arg, name-defined]
961 """Convert a dictionary of the form {degrees:X, arcminutes:Y, arcseconds:Z}
962 to a single Quantity.
963
964 Parameters
965 ----------
966 dictionary : dict[str , int | float]
967 Dictionary potentially containing angular information.
968
969 Returns
970 -------
971 astropy.units.Quantity[angle]
972 Combined angle.
973 """
974 total_angle = 0 * u.deg
975
976 for key in ["degrees", "arcminutes", "arcseconds"]:
977 value = dictionary.get(key, 0)
978 try:
979 float_value = float(value)
980 except ValueError as e:
981 raise ValueError(
982 f"Could not convert angular value {key}={value} to a float."
983 ) from e
984
985 total_angle += float_value * u.Unit(key[:-1])
986
987 return total_angle
988
989
[docs]
990def time_to_timedelta(time_object: time) -> timedelta:
991 """Converts a `datetime.time` object to a `datetime.timedelta`.
992
993 Parameters
994 ----------
995 time_object : datetime.time
996 Time as parsed by TOML.
997
998 Returns
999 -------
1000 datetime.timedelta
1001 Timedelta corresponding to the time from midnight to the given time.
1002 """
1003 components = {
1004 key: getattr(time_object, key[:-1])
1005 for key in ("hours", "minutes", "seconds", "microseconds")
1006 }
1007 return timedelta(**components)
1008
1009
[docs]
1010def get_config_option(
1011 toml_dictionary: TOMLConfig,
1012 toml_key: str,
1013 default_config: TOMLConfig,
1014 default_key: str | None = None,
1015) -> ConfigValue:
1016 """Access a config value from the TOML config provided, and if not present, search
1017 the provded default config.
1018
1019 Parameters
1020 ----------
1021 toml_dictionary : TOMLConfig
1022 TOML configuration.
1023 toml_key : str
1024 Nested key to access the TOML dictionary with.
1025 default_config : TOMLConfig
1026 Default configuration dictionary.
1027 default_key : str | None, default None
1028 Alternative key to access the default dictionary with, if different from
1029 `toml_key`.
1030
1031 Returns
1032 -------
1033 ConfigValue
1034 Value as located in one of the dictionaries.
1035 """
1036 if check_key_exists(toml_dictionary, toml_key):
1037 return access_nested_dictionary(toml_dictionary, split_nested_key(toml_key))
1038 if default_key is None:
1039 default_key = toml_key
1040 return access_nested_dictionary(default_config, split_nested_key(default_key))
1041
1042
[docs]
1043def angle_to_dms(angle: u.Quantity["angle"]) -> str: # type: ignore[type-arg,name-defined]
1044 """Convert a astropy angle to a pretty-printed string.
1045
1046 Parameters
1047 ----------
1048 angle : astropy.units.Quantity[angle]
1049 The angle quantity to format.
1050
1051 Returns
1052 -------
1053 str
1054 Latex-formatted string.
1055 """
1056 arcseconds = angle.to(u.arcsec).value
1057
1058 has_seconds = divmod(arcseconds, 60)[1] != 0
1059 has_minutes = divmod(arcseconds, 3600)[1] != 0
1060
1061 if has_seconds:
1062 fields = 3
1063 elif has_minutes:
1064 fields = 2
1065 else:
1066 fields = 1
1067
1068 ap_angle = Angle(angle)
1069 return ap_angle.to_string(fields=fields, format="latex")