Source code for skysim.settings

   1"""Setup functionality for SkySim. Includes both methods for parsing a
   2configuration TOML, and converting that data in `Settings` (and friends)
   3objects.
   4"""
   5
   6# License: GPLv3+ (see COPYING); Copyright (C) 2025 Tai Withers
   7
   8import tomllib
   9from collections.abc import Mapping
  10from datetime import date, datetime, time, timedelta
  11from functools import cached_property
  12from pathlib import Path
  13from typing import Any, ForwardRef, Self  # pylint: disable=unused-import
  14from zoneinfo import ZoneInfo
  15
  16import numpy as np
  17from astropy import units as u
  18from astropy.coordinates import ICRS, AltAz, Angle, EarthLocation, SkyCoord
  19from astropy.coordinates.name_resolve import NameResolveError
  20from astropy.time import Time
  21from astropy.wcs import WCS
  22from matplotlib.colors import LinearSegmentedColormap
  23from pydantic import (
  24    BaseModel,
  25    ConfigDict,
  26    Field,
  27    NonNegativeFloat,
  28    PositiveFloat,
  29    PositiveInt,
  30    ValidationError,
  31    ValidationInfo,
  32    computed_field,
  33    field_validator,
  34    model_validator,
  35)
  36from timezonefinder import TimezoneFinder
  37
  38from skysim.colours import InputColour, RGBTuple, convert_colour
  39from skysim.utils import FloatArray, IntArray
  40
  41# Type Aliases
  42
  43
  44type ConfigValue = str | date | time | int | float | dict[str, InputColour] | dict[
  45    int | float, int
  46] | list[int | float]
  47type ConfigMapping = Mapping[str, ConfigValue]
  48type TOMLConfig = dict[  # pylint: disable=invalid-name
  49    str, dict[str, ConfigValue | dict[str, ConfigValue]]
  50]
  51type SettingsPair = tuple["ImageSettings", "PlotSettings"]
  52
  53
  54# Constants
  55
  56
  57DEFAULT_CONFIG_PATH = Path(__file__).parent / "default.toml"
  58
  59DATACLASS_CONFIG = ConfigDict(
  60    arbitrary_types_allowed=True,
  61    extra="forbid",
  62    frozen=True,
  63)
  64
  65AIRY_DISK_RADIUS = 23 * u.arcmin / 2
  66"""How far light from an object should spread. Based on the SIMBAD image of Vega."""
  67
  68MAXIMUM_LIGHT_SPREAD = 10
  69"""Calculate the spread of light from an object out to this many standard deviations."""
  70
  71
  72# Classes
  73
  74
[docs] 75class Settings(BaseModel): # type: ignore[misc] 76 """Base class to interpret often-used configuration values. The `Settings` 77 class should never be used or passed directly, but instead should be created 78 only for the purpose of then calling `.get_image_settings()` and 79 `.get_plot_settings()`. 80 """ 81 82 model_config = DATACLASS_CONFIG 83 84 # Stored on initialization 85 input_location: str | list[float] 86 """User-input version of the observing location.""" 87 88 field_of_view: u.Quantity["angle"] # type: ignore[type-arg, name-defined] 89 """Diameter of the area being observed at any time.""" 90 91 altitude_angle: u.Quantity["angle"] # type: ignore[type-arg, name-defined] 92 """Angle of observation (measured from horizon).""" 93 94 azimuth_angle: u.Quantity["angle"] # type: ignore[type-arg, name-defined] 95 """Angle of observation (Eastwards from North).""" 96 image_pixels: PositiveInt 97 """Number of pixels (diameter) for the resulting image.""" 98 99 # Used to instantiate others, and stored, but hidden 100 start_date: date = Field(repr=False) 101 """Starting (local) date of observation.""" 102 103 start_time: time = Field(repr=False) 104 """Starting (local) time of observation.""" 105 106 snapshot_frequency: timedelta = Field(repr=False) 107 """How often an observation should be taken - should be given in concert 108 with `duration` """ 109 110 duration: timedelta = Field(repr=False) 111 """How long the total observation should last - should be given in concert 112 with `snapshot_frequency`""" 113
[docs] 114 @field_validator("field_of_view", "altitude_angle", "azimuth_angle", mode="after") 115 @classmethod 116 def convert_to_deg( 117 cls, angular: u.Quantity["angle"] # type: ignore[type-arg, name-defined] 118 ) -> u.Quantity["degree"]: # type: ignore[type-arg, name-defined] 119 """Convert angular quantities to degrees. 120 121 Parameters 122 ---------- 123 angular : astropy.units.Quantity[angle] 124 Astropy angular quantity. 125 126 Returns 127 ------- 128 astropy.units.Quantity[degree] 129 Input angle in degrees. 130 """ 131 return angular.to(u.deg)
132
[docs] 133 @model_validator(mode="after") 134 def compare_timespans(self) -> Self: 135 """ 136 Confirm that the time between snapshots is not greater than the observation 137 duration. 138 139 Returns 140 ------- 141 Self 142 `Settings` object. 143 144 Raises 145 ------ 146 ValueError 147 Raised if the snapshot frequency is greater than the duration. 148 """ 149 if self.snapshot_frequency > self.duration: 150 raise ValueError( 151 "Frequency of snapshots (observation.interval) cannot be longer than " 152 "observation.duration." 153 ) 154 return self
155 156 # Derived and stored 157 @computed_field() 158 @cached_property 159 def frames(self) -> PositiveInt: 160 """ 161 Calculates number of frames for movie/observations to take. 162 163 Returns 164 ------- 165 pydantic.PositiveInt 166 Number of frames. 167 """ 168 if self.snapshot_frequency.total_seconds() > 0: 169 return int(self.duration / self.snapshot_frequency) 170 return 1 171 172 @computed_field() 173 @cached_property 174 def earth_location(self) -> EarthLocation: 175 """ 176 Looks up where on Earth the user requested the observation be taken from. 177 178 Returns 179 ------- 180 astropy.coordinates.EarthLocation 181 Astropy representation of location on Earth. 182 183 Raises 184 ------ 185 NotImplementedError 186 Raised if location lookup fails. 187 """ 188 try: 189 return EarthLocation.of_address(self.input_location) 190 except NameResolveError as e: 191 if "connection" in e.args[0]: 192 raise ConnectionError(e.args[0].replace("address", "location")) from e 193 raise ValueError(e.args[0].replace("address", "location")) from e 194 195 @computed_field() 196 @cached_property 197 def timezone(self) -> ZoneInfo: 198 """ 199 Look up timezone based on Lat/Long. 200 201 Returns 202 ------- 203 zoneinfo.ZoneInfo 204 Timezone information. 205 206 Raises 207 ------ 208 NotImplementedError 209 Raised in the case that the lookup fails. 210 """ 211 lat, lon = [ 212 l.to(u.deg).value 213 for l in [self.earth_location.lat, self.earth_location.lon] 214 ] 215 tf = TimezoneFinder() 216 tzname = tf.timezone_at(lat=lat, lng=lon) 217 if tzname is None: 218 raise ValueError( # TODO: add test for this error 219 f"Cannot determine timezone for {lat}, {lon} ({self.input_location})" 220 ) 221 222 return ZoneInfo(tzname) 223 224 @computed_field() 225 @cached_property 226 def observation_times(self) -> Time: 227 """ 228 Calculates the times at which to take a snapshot. 229 230 Returns 231 ------- 232 astropy.time.Time 233 Astropy representation of one or more times. 234 """ 235 start_datetime = datetime( 236 year=self.start_date.year, 237 month=self.start_date.month, 238 day=self.start_date.day, 239 hour=self.start_time.hour, 240 minute=self.start_time.minute, 241 second=self.start_time.second, 242 microsecond=self.start_time.microsecond, 243 tzinfo=self.timezone, 244 ) 245 return Time( 246 [(start_datetime + self.snapshot_frequency * i) for i in range(self.frames)] 247 ) 248 249 @computed_field() 250 @cached_property 251 def observation_radec(self) -> SkyCoord: 252 """ 253 Calculates the observed RA/Dec position for each observation snapshot. 254 255 Returns 256 ------- 257 astropy.coordinates.SkyCoord 258 Astropy representation of one or more coordinates. 259 """ 260 earth_frame = AltAz( 261 obstime=self.observation_times, 262 az=self.azimuth_angle, 263 alt=self.altitude_angle, 264 location=self.earth_location, 265 ) 266 return SkyCoord(earth_frame.transform_to(ICRS())) 267 268 @computed_field() 269 @cached_property 270 def degrees_per_pixel(self) -> u.Quantity["angle"]: # type: ignore[type-arg, name-defined] 271 """ 272 Calculates the number of degrees spanned by each pixel in the resulting image. 273 274 Returns 275 ------- 276 astropy.units.Quantity[angle] 277 Degrees per pixel (pixel considered unitless). 278 """ 279 return (self.field_of_view / self.image_pixels).to(u.deg) 280 281 @computed_field() 282 @cached_property 283 def local_datetimes(self) -> list[datetime]: 284 """Observation snapshot times as timezone-aware python times. 285 286 Returns 287 ------- 288 list[datetime.time] 289 List of observation times. 290 """ 291 utc = [ 292 t.to_datetime().replace(tzinfo=ZoneInfo("UTC")) 293 for t in self.observation_times 294 ] 295 return [u.astimezone(tz=self.timezone) for u in utc] 296 297 @computed_field() 298 @cached_property 299 def wcs_objects(self) -> list[WCS]: 300 """WCS objects for each timestep. 301 302 Returns 303 ------- 304 list[astropy.wcs.WCS] 305 WCS objects for each timestep. 306 """ 307 wcs_by_frame = [] 308 for radec in self.observation_radec: 309 wcs = WCS(naxis=2) 310 wcs.wcs.crpix = [self.image_pixels / 2] * 2 311 wcs.wcs.cdelt = [self.degrees_per_pixel.value, self.degrees_per_pixel.value] 312 wcs.wcs.crval = [radec.ra.value, radec.dec.value] 313 wcs.wcs.ctype = ["RA", "DEC"] 314 wcs.wcs.cunit = [u.deg, u.deg] 315 wcs_by_frame.append(wcs) 316 return wcs_by_frame 317 318 def __str__(self: "Settings") -> str: 319 result = "" 320 for k, v in self.model_dump().items(): 321 v = str(v).replace("\n", "\n\t") 322 result += f"{k}: {v}\n" 323 return result[:-1] 324
[docs] 325 def get_image_settings(self: "Settings", **kwargs: Any) -> "ImageSettings": 326 """ 327 Generate an `ImageSettings` object inheriting this object's information. 328 329 Parameters 330 ---------- 331 **kwargs 332 Dictionary of arguments to be passed to `ImageSettings`. 333 334 Returns 335 ------- 336 ImageSettings 337 Object containing all passed configuration values as well as those from the 338 instantiation of this `Settings` object. 339 """ 340 return ImageSettings(**vars(self), **kwargs)
341
[docs] 342 def get_plot_settings(self: "Settings", **kwargs: Any) -> "PlotSettings": 343 """ 344 Generate an `PlotSettings` object inheriting this object's information. 345 346 Parameters 347 ---------- 348 **kwargs 349 Dictionary of arguments to be passed to `PlotSettings`. 350 351 Returns 352 ------- 353 PlotSettings 354 Object containing all passed configuration values as well as those from the 355 instantiation of this `Settings` object. 356 """ 357 return PlotSettings(**vars(self), **kwargs)
358 359
[docs] 360class ImageSettings(Settings): # type: ignore[misc] 361 """`Settings` subclass to hold values used when populating the image array. 362 Additionally contains a copy of all attributes belonging to the `Settings` instance 363 it inherits from. 364 """ 365 366 model_config = DATACLASS_CONFIG 367 368 # Stored on initialization 369 object_colours: dict[str, RGBTuple] = Field() 370 """Mapping between object types, and the colours to use on the image.""" 371 372 # Used to instantiate others 373 colour_values: list[RGBTuple] = Field(repr=False) 374 """List of colours to use to fill out the background.""" 375 376 colour_time_indices: dict[float | int, int] = Field(repr=False) 377 """Mapping between hour of the day (0-24, float) and the index corresponding 378 to the colour in `colour_values` to use at that time.""" 379 380 magnitude_values: list[float | int] = Field(repr=False) 381 """List of maximum magnitude values, to be used in the same manner as 382 `colour_values`.""" 383 384 magnitude_time_indices: dict[float | int, int] = Field(repr=False) 385 """Same as `colour_time_indices`, except applying to magnitude_values. Need not be 386 the same as `colour_time_indices`.""" 387 388 @field_validator("object_colours", mode="before") 389 @classmethod 390 def _convert_colour_dict( # type: ignore[misc] 391 cls, colour_dict: dict[str, Any] 392 ) -> dict[str, RGBTuple]: 393 return {key: convert_colour(value) for key, value in colour_dict.items()} 394 395 @field_validator("colour_values", mode="before") 396 @classmethod 397 def _convert_colour_list( # type: ignore[misc] 398 cls, colour_list: list[Any] 399 ) -> list[RGBTuple]: 400 return [convert_colour(value) for value in colour_list] 401 402 # Derived and stored 403 @computed_field() 404 @cached_property 405 def maximum_magnitude(self) -> float: 406 """The highest magnitude that will ever be visible in the image. 407 408 Returns 409 ------- 410 float 411 Magnitude. 412 """ 413 return max(self.magnitude_values) 414 415 @computed_field() 416 @cached_property 417 def colour_mapping(self) -> LinearSegmentedColormap: 418 """Interpolate between the colour-time mappings indicated by `colour_values` and 419 `colour_time_indices` to generate an addressable mapping. 420 421 Returns 422 ------- 423 matplotlib.colors.LinearSegmentedColormap 424 Callable object on the interval [0,1] returning a RGBTuple. 425 """ 426 colour_by_time = [ 427 (hour / 24, self.colour_values[index]) 428 for hour, index in self.colour_time_indices.items() 429 ] 430 return LinearSegmentedColormap.from_list("sky", colour_by_time) 431 432 @computed_field() 433 @cached_property 434 def magnitude_mapping(self) -> FloatArray: 435 """Interpolate between the magnitude-time mappings indicated by 436 `magnitude_values` and `magnitude_time_indices` to generate an addressable 437 mapping. 438 439 Returns 440 ------- 441 FloatArray 442 Array containing the calculated magnitude value for each second of the day. 443 """ 444 magnitude_day_percentage = [ 445 hour / 24 for hour in self.magnitude_time_indices.keys() 446 ] 447 magnitude_by_time = [ 448 self.magnitude_values[index] 449 for index in self.magnitude_time_indices.values() 450 ] 451 day_percentages = np.linspace(0, 1, 24 * 60 * 60) 452 return np.interp(day_percentages, magnitude_day_percentage, magnitude_by_time) 453 454 @computed_field() 455 @cached_property 456 def light_spread_stddev(self) -> PositiveFloat: 457 """Standard deviation for the Gaussian which defines the spread of starlight. 458 459 Returns 460 ------- 461 pydantic.PositiveFloat 462 Standard deviation. 463 """ 464 airy_disk_pixels = AIRY_DISK_RADIUS.to(u.deg) / self.degrees_per_pixel 465 airy_disk_pixels = airy_disk_pixels.decompose() 466 467 # assume the airy disk is at 3x standard deviation of the Gaussian 468 std_dev = airy_disk_pixels / 3 469 470 # standard deviation is "diameter" whilst airy is "radius" 471 std_dev *= 2 472 473 return float(std_dev) 474 475 @computed_field() 476 @cached_property 477 def area_mesh(self) -> IntArray: 478 """Create a mesh of indices that spread out from a central point. 479 480 Returns 481 ------- 482 IntArray 483 (2, X, X) array. 484 """ 485 maximum_radius = np.ceil( 486 MAXIMUM_LIGHT_SPREAD * self.light_spread_stddev 487 ).astype(int) 488 489 radius_vector = np.arange(-maximum_radius, maximum_radius + 1) 490 491 # mesh of points which will map to the region around the star 492 return np.array(np.meshgrid(radius_vector, radius_vector)) 493 494 @computed_field() 495 @cached_property 496 def brightness_scale_mesh(self) -> FloatArray: 497 """Create a mesh of scaling factors that will dictate how a star's light 498 falls off with distance. 499 500 Returns 501 ------- 502 FloatArray 503 2D mesh of [0,1] values. 504 """ 505 506 # radius measurement at each mesh point 507 radial_distance = np.sqrt(self.area_mesh[0] ** 2 + self.area_mesh[1] ** 2) 508 509 unique_radii = np.unique(radial_distance) 510 511 # all of the locations where each unique radius is found 512 radius_locations = [] 513 for r in unique_radii: 514 locations = np.array(np.where(radial_distance == r)).T 515 radius_locations.append(locations) 516 517 mesh = np.zeros_like(self.area_mesh[0]) # instantiate brightness_scale_mesh 518 519 for r, p in zip(unique_radii, radius_locations): 520 brightness_scale = self.brightness_gaussian(r) 521 # TODO: remove this loop by using a smarter numpy-based way to store 522 # radius_locations entries 523 for x, y in p: 524 mesh[y, x] = brightness_scale 525 526 return mesh 527
[docs] 528 def brightness_gaussian(self, radius: NonNegativeFloat) -> NonNegativeFloat: 529 """Calculate how much light is observed from a star at some radius away 530 from it. 531 532 Parameters 533 ---------- 534 radius : pydantic.NonNegativeFloat 535 Distance in pixels. 536 537 Returns 538 ------- 539 pydantic.NonNegativeFloat 540 Scaling factor for brightness. 541 """ 542 return np.exp(-(radius**2) / (self.light_spread_stddev**2))
543 544
[docs] 545class PlotSettings(Settings): # type: ignore[misc] 546 """`Settings` subclass to hold values used when generating the final plot 547 Additionally contains a copy of all attributes belonging to the `Settings` instance 548 it inherits from. 549 """ 550 551 model_config = DATACLASS_CONFIG 552 553 # Stored on initialization 554 fps: NonNegativeFloat 555 """Frames per second of the GIF/video generated. Zero if there is only one 556 frame.""" 557 558 filename: Path 559 """Location to save the plot.""" 560 561 figure_size: tuple[PositiveFloat, PositiveFloat] 562 """Size of the figure in inches to pass to `matplotlib`.""" 563 564 dpi: PositiveInt 565 """Dots per inch, passed to `matplotlib`.""" 566 567 @computed_field() 568 @cached_property 569 def observation_info(self) -> str: 570 """Generate a string containing information about the observation for the plot. 571 Only contains information which is constant throughout all snapshots. 572 573 Returns 574 ------- 575 str 576 Formatted string. 577 """ 578 altitude = angle_to_dms(self.altitude_angle) 579 azimuth = angle_to_dms(self.azimuth_angle) 580 fov = angle_to_dms(self.field_of_view) 581 return ( 582 f"{self.input_location}\n Altitude: {altitude}, " 583 f"Azimuth: {azimuth}, FOV: {fov}" 584 ) 585 586 @computed_field 587 @cached_property 588 def datetime_strings(self) -> list[str]: 589 """Printable observation times. 590 591 Returns 592 ------- 593 list[str] 594 List of strings for each time. 595 """ 596 if (self.start_time.second == 0) and divmod( 597 self.snapshot_frequency.total_seconds(), 60 598 )[1] == 0: 599 fmt_string = "%Y-%m-%d %H:%M %Z" 600 else: 601 fmt_string = "%Y-%m-%d %X %Z" 602 603 return [i.strftime(fmt_string) for i in self.local_datetimes] 604 605 @computed_field 606 @cached_property 607 def tempfile_path(self) -> Path: 608 """Path to store video frame files. 609 610 Returns 611 ------- 612 pathlib.Path 613 Directory. 614 """ 615 return self.filename.parent / "SkySimFiles" 616 617 @computed_field 618 @cached_property 619 def tempfile_zfill(self) -> int: 620 """How much zero-padding to use when writing the video frame filenames. 621 622 Returns 623 ------- 624 int 625 Number of digits. 626 """ 627 return np.ceil(np.log10(self.frames)).astype(int) 628
[docs] 629 @field_validator("filename", mode="after") 630 @classmethod 631 def check_parent_directory_exists(cls, filename: Path) -> Path: 632 """Raise an error if the directory in which to save the image doesn't exist. 633 634 Parameters 635 ---------- 636 filename : pathlib.Path 637 Location to save image. 638 639 Returns 640 ------- 641 pathlib.Path : 642 Same as input. 643 644 Raises 645 ------ 646 ValueError 647 Raised if parent directory does not exist. 648 """ 649 650 if not filename.parent.exists(): 651 raise ValueError( 652 f"Cannot save result '{filename.resolve()}' because parent directory " 653 f"'{filename.parent.resolve()}' does not exist." 654 ) 655 return filename.resolve()
656
[docs] 657 @field_validator("fps", mode="after") 658 @classmethod 659 def validate_fps( 660 cls, input_fps: NonNegativeFloat, info: ValidationInfo 661 ) -> NonNegativeFloat: 662 """Ensure fps conforms to the described requirements. 663 664 Parameters 665 ---------- 666 input_fps : pydantic.NonNegativeFloat 667 Frames per second as input when instantiating the `PlotSettings` object. 668 info : pydantic.ValidationInfo 669 Pydantic `ValidationInfo` object allowing access to the other 670 already-validated fields. 671 672 Returns 673 ------- 674 pydantic.NonNegativeFloat 675 Validated fps. 676 677 Raises 678 ------ 679 ValueError 680 Raised if `fps` is given as zero but the observation `duration` implies 681 there should be multiple frames. 682 """ 683 if info.data["duration"].total_seconds() == 0: 684 return 0 685 if input_fps == 0: 686 raise ValueError( 687 f"Non-zero duration ({info.data['duration']}) implies the " 688 "creation of a movie, but the given fps was zero." 689 ) 690 return input_fps
691 692 693# Methods 694 695 696## Top-Level Settings Methods 697 698
[docs] 699def load_from_toml( 700 filename: Path, return_settings: bool = False 701) -> Settings | SettingsPair: 702 """Load configuration options from a TOML file and parse them into `Settings` 703 objects. 704 705 Parameters 706 ---------- 707 filename : str 708 Location of the configuration file. 709 return_settings : bool, optional 710 Whether to return the `Settings` object (`True`) or `ImageSettings` and 711 `PlotSettings` objects (`False`). Default `False`. 712 713 Returns 714 ------- 715 tuple[ImageSettings, PlotSettings] 716 `Settings` objects generated from the configuration. 717 """ 718 719 settings_config, image_config, plot_config = toml_to_dicts(filename) 720 721 try: 722 settings = Settings(**settings_config) 723 724 if return_settings: 725 return settings 726 727 image_settings = settings.get_image_settings(**image_config) 728 plot_settings = settings.get_plot_settings(**plot_config) 729 730 return image_settings, plot_settings 731 732 except ValidationError as e: 733 # pydantic does a lot of wrapping around their errors... 734 original_error = e.errors()[0] 735 error_message = original_error["msg"] # "Value error, [original message]" 736 737 if original_error["type"] == "value_error": # explicitly raised in a validator 738 skip_point = "error, " 739 index = error_message.index(skip_point) + len(skip_point) 740 new_message = error_message[index:] 741 742 else: # probaby a type coercion error listed in 743 # https://docs.pydantic.dev/2.10/errors/validation_errors/ 744 bad_dict_key = original_error["loc"][0] 745 bad_dict_value = original_error["input"] 746 new_message = ( 747 f"Error processing '{bad_dict_value}' into {bad_dict_key}. " 748 f"{error_message}." 749 ) 750 751 raise ValueError(new_message) from e
752 753 754## Helper Methods 755 756
[docs] 757def toml_to_dicts( 758 filename: Path, 759) -> tuple[dict[str, Any], dict[str, Any], dict[str, Any]]: 760 """Read the configuration file and combine it with the default configuration 761 to get dictionaries of values which can be used for the various `Settings` classes. 762 763 Parameters 764 ---------- 765 filename : pathlib.Path 766 Path the the toml config file. 767 768 Returns 769 ------- 770 tuple[ConfigMapping] 771 Dictionary for `Settings`, `ImageSettings`, and `PlotSettings`. 772 """ 773 774 # open the config file 775 with filename.open(mode="rb") as opened: 776 try: 777 toml_config = tomllib.load(opened) 778 except tomllib.TOMLDecodeError as e: 779 raise ValueError(f"Error reading config file. {e.args[0]}.") from e 780 781 check_mandatory_toml_keys(toml_config) 782 783 with DEFAULT_CONFIG_PATH.open(mode="rb") as default: 784 default_config = tomllib.load(default) 785 786 load_or_default = lambda toml_key, default_key=None: get_config_option( 787 toml_config, toml_key, default_config, default_key 788 ) 789 790 settings_config = { 791 # Mandatory 792 "input_location": toml_config["observation"]["location"], 793 "field_of_view": parse_angle_dict(toml_config["observation"]["viewing-radius"]), 794 "altitude_angle": parse_angle_dict(toml_config["observation"]["altitude"]), 795 "azimuth_angle": parse_angle_dict(toml_config["observation"]["azimuth"]), 796 "start_date": toml_config["observation"]["date"], 797 "start_time": toml_config["observation"]["time"], 798 # Optional 799 "image_pixels": load_or_default("image.pixels"), 800 "duration": time_to_timedelta( 801 load_or_default("observation.duration") # type: ignore[arg-type] 802 ), 803 "snapshot_frequency": time_to_timedelta( 804 load_or_default("observation.interval") # type: ignore[arg-type] 805 ), 806 } 807 808 image_config = { 809 k: load_or_default(f"image.{v}", default_key=f"image.{v}") 810 for k, v in { 811 # Optional 812 "object_colours": "object-colours", 813 "colour_values": "sky-colours", 814 "colour_time_indices": "sky-colours-index-by-time", 815 "magnitude_values": "maximum-magnitudes", 816 "magnitude_time_indices": "maximum-magnitudes-index-by-time", 817 }.items() 818 } 819 820 plot_config = { 821 # Mandatory 822 "filename": toml_config["image"]["filename"], 823 # Optional 824 "fps": load_or_default("image.fps"), 825 "dpi": load_or_default("image.dpi"), 826 "figure_size": ( 827 load_or_default("image.width"), 828 load_or_default("image.height"), 829 ), 830 } 831 832 return settings_config, image_config, plot_config
833 834
[docs] 835def split_nested_key(full_key: str) -> list[str]: 836 """Convert a string of the form 'a.b.c' into a list of the form ['a','b','c']. 837 838 Parameters 839 ---------- 840 full_key : str 841 String of the form 'a.b.c'. 842 843 Returns 844 ------- 845 list[str] 846 List generated from string. 847 """ 848 return full_key.split(".")
849 850
[docs] 851def access_nested_dictionary( 852 dictionary: dict[str, Any], keys: list[str] 853) -> ConfigValue: 854 """Access a value from an arbitrarily nested dictionary via a list of keys. 855 856 Parameters 857 ---------- 858 dictionary : dict[str, typing.Any] 859 The top-level dictionary. 860 keys : list[str] 861 List of dictionary keys. 862 863 Returns 864 ------- 865 ConfigValue 866 The accessed value. 867 """ 868 subdictionary = dictionary.copy() 869 for key in keys[:-1]: 870 subdictionary = subdictionary[key] 871 return subdictionary[keys[-1]]
872 873
[docs] 874def check_key_exists(dictionary: TOMLConfig, full_key: str) -> bool: 875 """Check if a key exists within a nested dictionary. 876 877 Parameters 878 ---------- 879 dictionary : TOMLConfig 880 The top-level dictionary to check. 881 full_key : str 882 Potentially nested key. 883 884 Returns 885 ------- 886 bool 887 Whether or not the key exists. 888 """ 889 try: 890 value = access_nested_dictionary(dictionary, split_nested_key(full_key)) 891 assert len(str(value)) > 0 892 return True 893 except (KeyError, AssertionError): 894 return False
895 896
[docs] 897def check_mandatory_toml_keys(dictionary: TOMLConfig) -> None: 898 """Validate the existence of the required keys in the TOML configuration. 899 900 Parameters 901 ---------- 902 dictionary : TOMLConfig 903 Loaded user configuration. 904 905 Raises 906 ------ 907 ValueError 908 Raised if: 909 - mandatory keys aren't present 910 - keys that require at least one of some group aren't present 911 - keys that are required in sets are not property provided 912 """ 913 mandatory_keys = [f"observation.{key}" for key in ("location", "date", "time")] + [ 914 "image.filename" 915 ] 916 917 # Keys where at least one should be present (i.e., user can have any combination of 918 # observation.altitude.degrees, observation.altitude.arcminutes, and 919 # observation.altitude.arcseconds, but at least one of them must be included) 920 one_or_more_keys = [ 921 [ 922 f"observation.{key}.{unit}" 923 for unit in ("degrees", "arcminutes", "arcseconds") 924 ] 925 for key in ("viewing-radius", "altitude", "azimuth") 926 ] 927 928 # keys which should either be excluded entirely, or present only in complete sets 929 all_or_none_keys = [ 930 [f"observation.{key}" for key in ("interval", "duration")], 931 [f"image.{key}" for key in ("width", "height")], 932 ] 933 # check that mandatory keys are provided 934 for key in mandatory_keys: 935 if not check_key_exists(dictionary, key): 936 raise ValueError(f"Required element {key} was not found.") 937 938 # check that one-or-more keys are provided 939 for keyset in one_or_more_keys: 940 keys_exist = [check_key_exists(dictionary, key) for key in keyset] 941 if not any(keys_exist): 942 raise ValueError( 943 f"One or more of {keyset} must be given, but none were found." 944 ) 945 946 # all_or_none keys 947 for keyset in all_or_none_keys: 948 keys_exist = [check_key_exists(dictionary, key) for key in keyset] 949 if (not all(keys_exist)) and any(keys_exist): 950 raise ValueError( 951 f"Some but not all of the keys {keyset} were given. " 952 "These keys must be given all together or not at all." 953 ) 954 955 return
956 957
[docs] 958def parse_angle_dict( 959 dictionary: dict[str, int | float], 960) -> u.Quantity["angle"]: # type: ignore[type-arg, name-defined] 961 """Convert a dictionary of the form {degrees:X, arcminutes:Y, arcseconds:Z} 962 to a single Quantity. 963 964 Parameters 965 ---------- 966 dictionary : dict[str , int | float] 967 Dictionary potentially containing angular information. 968 969 Returns 970 ------- 971 astropy.units.Quantity[angle] 972 Combined angle. 973 """ 974 total_angle = 0 * u.deg 975 976 for key in ["degrees", "arcminutes", "arcseconds"]: 977 value = dictionary.get(key, 0) 978 try: 979 float_value = float(value) 980 except ValueError as e: 981 raise ValueError( 982 f"Could not convert angular value {key}={value} to a float." 983 ) from e 984 985 total_angle += float_value * u.Unit(key[:-1]) 986 987 return total_angle
988 989
[docs] 990def time_to_timedelta(time_object: time) -> timedelta: 991 """Converts a `datetime.time` object to a `datetime.timedelta`. 992 993 Parameters 994 ---------- 995 time_object : datetime.time 996 Time as parsed by TOML. 997 998 Returns 999 ------- 1000 datetime.timedelta 1001 Timedelta corresponding to the time from midnight to the given time. 1002 """ 1003 components = { 1004 key: getattr(time_object, key[:-1]) 1005 for key in ("hours", "minutes", "seconds", "microseconds") 1006 } 1007 return timedelta(**components)
1008 1009
[docs] 1010def get_config_option( 1011 toml_dictionary: TOMLConfig, 1012 toml_key: str, 1013 default_config: TOMLConfig, 1014 default_key: str | None = None, 1015) -> ConfigValue: 1016 """Access a config value from the TOML config provided, and if not present, search 1017 the provded default config. 1018 1019 Parameters 1020 ---------- 1021 toml_dictionary : TOMLConfig 1022 TOML configuration. 1023 toml_key : str 1024 Nested key to access the TOML dictionary with. 1025 default_config : TOMLConfig 1026 Default configuration dictionary. 1027 default_key : str | None, default None 1028 Alternative key to access the default dictionary with, if different from 1029 `toml_key`. 1030 1031 Returns 1032 ------- 1033 ConfigValue 1034 Value as located in one of the dictionaries. 1035 """ 1036 if check_key_exists(toml_dictionary, toml_key): 1037 return access_nested_dictionary(toml_dictionary, split_nested_key(toml_key)) 1038 if default_key is None: 1039 default_key = toml_key 1040 return access_nested_dictionary(default_config, split_nested_key(default_key))
1041 1042
[docs] 1043def angle_to_dms(angle: u.Quantity["angle"]) -> str: # type: ignore[type-arg,name-defined] 1044 """Convert a astropy angle to a pretty-printed string. 1045 1046 Parameters 1047 ---------- 1048 angle : astropy.units.Quantity[angle] 1049 The angle quantity to format. 1050 1051 Returns 1052 ------- 1053 str 1054 Latex-formatted string. 1055 """ 1056 arcseconds = angle.to(u.arcsec).value 1057 1058 has_seconds = divmod(arcseconds, 60)[1] != 0 1059 has_minutes = divmod(arcseconds, 3600)[1] != 0 1060 1061 if has_seconds: 1062 fields = 3 1063 elif has_minutes: 1064 fields = 2 1065 else: 1066 fields = 1 1067 1068 ap_angle = Angle(angle) 1069 return ap_angle.to_string(fields=fields, format="latex")