Source code for skysim.query

  1"""Query Simbad & Astropy to get information about celestial objects, using an
  2`~skysim.settings.ImageSettings` object to filter the data.
  3"""
  4
  5# License: GPLv3+ (see COPYING); Copyright (C) 2025 Tai Withers
  6
  7import warnings
  8from collections.abc import Collection
  9from typing import Any
 10
 11import numpy as np
 12from astropy import units as u
 13from astropy.coordinates import EarthLocation, SkyCoord, get_body
 14from astropy.table import QTable, Table, unique
 15from astropy.time import Time
 16from astroquery.exceptions import NoResultsWarning
 17from astroquery.simbad import Simbad
 18from pydantic import PositiveFloat
 19from pyvo.dal.exceptions import DALQueryError
 20
 21from skysim.colours import RGBTuple
 22from skysim.utils import round_columns
 23
 24# Constants
 25
 26
 27SOLARSYSTEM_BODIES = Table(
 28    {
 29        "name": [
 30            "mercury",
 31            "venus",
 32            "mars",
 33            "jupiter",
 34            "saturn",
 35            "uranus",
 36            "neptune",
 37        ],
 38        "magnitude.offset": [
 39            -0.613,
 40            -4.384,
 41            -1.601,
 42            -9.395,
 43            -8.914,
 44            -7.11,
 45            -7,
 46        ],
 47    }
 48)
 49"""Dictionary of bodies (planets) and base magnitudes (used in
 50`get_planet_magnitude`)
 51"""
 52
 53BASIC_TABLE = {
 54    "names": [
 55        "id",
 56        "ra",
 57        "dec",
 58        "magnitude",
 59        "spectral_type",
 60    ],
 61    "dtype": [str, float, float, float, str],
 62    "units": [None, "deg", "deg", None, None],
 63}
 64"""Basic table structure used for celestial objects."""
 65
 66
 67FALLBACK_SPECTRAL_TYPE = "fallback"
 68"""The spectral type to assign to an object that doesn't have one. """
 69
 70
 71# Methods
 72
 73
 74## Top-Level Query Methods
 75
 76
[docs] 77def get_body_locations( 78 observation_times: Time, earth_location: EarthLocation 79) -> dict[str, SkyCoord]: 80 """Get ephemeris for the sun/earth/planets. 81 82 Parameters 83 ---------- 84 observation_times : astropy.time.Time 85 Times to check for. 86 earth_location : astropy.coordinates.EarthLocation 87 Viewing location. 88 89 Returns 90 ------- 91 dict[str, astropy.coordinates.SkyCoord] 92 Locations for sun, earth, planets as dictionary. 93 """ 94 95 all_bodies = list(SOLARSYSTEM_BODIES["name"].data) + ["sun", "earth"] 96 97 return { 98 name: get_body(name, observation_times, location=earth_location) 99 for name in all_bodies 100 }
101 102
[docs] 103def get_planet_table( 104 body_locations: dict[str, SkyCoord], 105) -> list[QTable]: 106 """ 107 Get the planetary information at each observing time. 108 109 Parameters 110 ---------- 111 body_locations : dict[str, astropy.coordinates.SkyCoord] 112 Dictionary of sun and planet locations for each observation time 113 (includes earth). 114 115 Returns 116 ------- 117 list[astropy.table.QTable] 118 QTable of planetary info for each observing time. 119 """ 120 sun_locations = body_locations.pop("sun") 121 earth_locations = body_locations.pop("earth") 122 planet_table = [] 123 124 # for each time step, use the sun and earth locations 125 for i, (sun, earth) in enumerate(zip(sun_locations, earth_locations)): 126 this_time = QTable(**BASIC_TABLE) 127 128 # add a row to the table for each planet 129 for name, mag_offset in SOLARSYSTEM_BODIES[["name", "magnitude.offset"]]: 130 body_location = body_locations[name][i] 131 sun_distance = body_location.separation_3d(sun).to(u.au).value 132 earth_distance = body_location.separation_3d(earth).to(u.au).value 133 row = { 134 "ra": body_location.ra, 135 "dec": body_location.dec, 136 "magnitude": get_planet_magnitude( 137 mag_offset, sun_distance, earth_distance 138 ), 139 "spectral_type": name, 140 "id": name, 141 } 142 this_time.add_row(row) 143 144 this_time = round_columns(this_time) 145 planet_table.append(this_time) 146 147 return planet_table
148 149
[docs] 150def get_star_table( 151 observation_radec: SkyCoord, 152 field_of_view: u.Quantity["angle"], # type: ignore[type-arg, name-defined] 153 maximum_magnitude: float, 154 object_colours: dict[str, RGBTuple], 155 verbose_level: int = 1, 156) -> QTable: 157 """ 158 Query Simbad to get celestial objects. 159 160 Parameters 161 ---------- 162 observation_radec : astropy.coordinates.SkyCoord 163 RA, Dec coordinates that get observed. 164 field_of_view : astropy.units.Quantity[angle] 165 Diameter of observation. 166 maximum_magnitude : float 167 Highest magnitude value to search for. 168 object_colours : dict[str, RGBTuple] 169 Colours of the objects - used to check if spectral types are valid. 170 verbose_level : int 171 How much detail to print. 172 173 Returns 174 ------- 175 astropy.table.QTable 176 Table of all valid celestial objects. 177 """ 178 query_result = run_simbad_query( 179 "region", 180 extra_columns=["otype", "V", "ids", "sp_type"], 181 coordinates=observation_radec, 182 radius=field_of_view / 2, 183 criteria=f"otype != 'err' AND V < {maximum_magnitude}", 184 ) 185 # remove/rename columns 186 query_result = clean_simbad_table_columns(query_result) 187 188 # remove the "ids" column & repurposes the "id" column 189 query_result = get_star_name_column(query_result) 190 191 if len(query_result) == 0: 192 if verbose_level > 1: 193 print("Query to SIMBAD resulted in no objects.") 194 return query_result 195 196 # spectral types 197 spectral_types = get_spectral_types(object_colours) 198 query_result = simplify_spectral_types(query_result, spectral_types) 199 200 # remove child elements 201 query_result = remove_child_stars(query_result, maximum_magnitude) 202 203 # final general cleanup 204 query_result = round_columns(query_result) 205 query_result = unique(query_result) 206 207 if verbose_level > 1: 208 print(f"Query to SIMBAD resulted in {len(query_result)} objects.") 209 return query_result
210 211 212## Helper Methods 213 214
[docs] 215def get_spectral_types(object_colours: dict[str, RGBTuple]) -> list[str]: 216 """Convert the user-input object colours dictionary to a list of valid 217 spectral types. 218 219 Parameters 220 ---------- 221 object_colours : dict[str, RGBTuple] 222 User input. 223 224 Returns 225 ------- 226 list[str] 227 Acceptable spectral types. 228 """ 229 spectral_types = [ 230 i for i in object_colours.keys() if i not in SOLARSYSTEM_BODIES["name"] 231 ] 232 return [i for i in spectral_types if i != FALLBACK_SPECTRAL_TYPE]
233 234
[docs] 235def run_simbad_query( 236 query_type: str, extra_columns: Collection[str] = (), **kwargs: Any 237) -> QTable: 238 """Query SIMBAD with either a region or TAP request. 239 240 Parameters 241 ---------- 242 query_type : str 243 "region" or "tap" - the type of request to send. 244 extra_columns : collections.abc.Collection[str], default `()` 245 Extra columns to add to SIMBAD outputs. Only valid for "region" query 246 type. 247 **kwargs : collections.abc.Mapping 248 Unpacked and passed to the query function. 249 250 Returns 251 ------- 252 astropy.table.QTable 253 Simbad result. 254 255 Raises 256 ------ 257 ValueError 258 Raised if `query_type` is not "region" or "tap". 259 """ 260 261 result = QTable(**BASIC_TABLE) 262 with warnings.catch_warnings(action="ignore", category=NoResultsWarning): 263 if query_type == "region": 264 Simbad.add_votable_fields(*extra_columns) 265 result = QTable(Simbad.query_region(**kwargs)) 266 elif query_type == "tap": 267 if len(extra_columns) > 0: 268 raise ValueError( # TODO: add test for this error 269 f"{extra_columns=} was passed to run_simbad_query, but " 270 f"{query_type=} which doesn't support that." 271 ) 272 result = QTable(Simbad.query_tap(**kwargs)) 273 else: 274 raise ValueError( # TODO: add test for this error 275 f'{query_type=} is invalid, should be one of ["region","tap"].' 276 ) 277 Simbad.reset_votable_fields() 278 return result
279 280
[docs] 281def get_planet_magnitude( 282 base_magnitude: float, 283 distance_to_sun: PositiveFloat, 284 distance_to_earth: PositiveFloat, 285) -> float: 286 """Calculate the magnitude for a planet. 287 288 Parameters 289 ---------- 290 base_magnitude : float 291 Static base magnitude for the planet. 292 distance_to_sun : pydantic.PositiveFloat 293 Distance (in au) from the planet to the sun. 294 distance_to_earth : pydantic.PositiveFloat 295 Distance (in au) from the planet to the Earth. 296 297 Returns 298 ------- 299 float 300 The magnitude. 301 """ 302 return ( 303 5 * np.log10(distance_to_sun * distance_to_earth).astype(float) + base_magnitude 304 )
305 306
[docs] 307def get_star_name_column(star_table: QTable) -> QTable: 308 """Create human-readable `name` column. 309 310 Parameters 311 ---------- 312 star_table : astropy.table.QTable 313 Table of stars from which to generate the name column from the `ids` 314 column. 315 316 Returns 317 ------- 318 astropy.table.QTable 319 `star_table` with the ``id`` column replaced by a human-readable `name` 320 column, and the `ids` column removed. 321 """ 322 323 star_table["ids_list"] = [i.split("|") for i in star_table["ids"]] 324 names_column = [] 325 for simbad_id, namelist in zip(star_table["id"].data, star_table["ids_list"].data): 326 # strip "NAME " from entry 327 item_names = [n[5:] for n in namelist if "NAME" in n] 328 329 # store a human readable name of some form based on what's available 330 if len(item_names) == 0: 331 names_column.append(simbad_id) 332 elif len(item_names) == 1: 333 names_column.append(item_names[0]) 334 else: 335 names_column.append("/".join(item_names)) 336 337 star_table.replace_column("id", names_column) 338 star_table.remove_columns(["ids", "ids_list"]) 339 340 return star_table
341 342
[docs] 343def get_child_stars( 344 parent_stars: tuple[str], maximum_magnitude: float 345) -> Collection[str]: 346 """Query SIMBAD for any child objects of `parent_stars`. 347 348 Parameters 349 ---------- 350 parent_stars : tuple[str] 351 SIMAD ids. 352 maximum_magnitude : float 353 Filtering value for the children. 354 355 Returns 356 ------- 357 collections.abc.Collection[str] 358 Child ids. 359 """ 360 361 parent_stars_list = [] 362 for star in parent_stars: 363 if "/" in star: 364 star = star[: star.index("/")] 365 if "'" in star: 366 star = star.replace("'", "''") 367 parent_stars_list.append(star) 368 369 # str(tuple-with-one-element) adds a trailing comma, which trips up SIMBAD, 370 # so we do the string conversion ourself in this case 371 if len(parent_stars_list) == 1: 372 parent_stars_string = f"('{parent_stars_list[0]}')" 373 else: 374 parent_stars_string = f"{tuple(str(i) for i in parent_stars_list)}" 375 376 # write the query 377 parent_query_adql = f""" 378 SELECT main_id as "child", allfluxes.V 379 FROM h_link 380 JOIN ident as p on p.oidref=parent 381 JOIN basic on oid="child" 382 JOIN allfluxes on oid = allfluxes.oidref 383 WHERE p.id in {parent_stars_string} 384 AND V <= {maximum_magnitude}; 385 """ 386 387 try: 388 children = run_simbad_query("tap", query=parent_query_adql) 389 except DALQueryError as e: # TODO: find out how to force this error 390 # Sometimes one of the parent ids is...problematic 391 # if that's the case, just boot it out 392 if "1 unresolved identifiers" in e.reason: 393 bad_region = e.reason[e.reason.index("[") + 1 : e.reason.index("]")] 394 lineno = int(bad_region[2 : bad_region.index(" ")]) 395 get_col = lambda colstring: int(colstring[colstring.index(" ") + 3 :]) 396 col_start, col_end = [get_col(a) for a in bad_region.split(" - ")] 397 398 query_split = parent_query_adql.splitlines() 399 query_split[lineno - 1] = ( 400 query_split[lineno - 1][: col_start - 1] 401 + query_split[lineno - 1][col_end + 1 :] 402 ) 403 404 parent_query_adql = "\n".join(query_split) 405 children = run_simbad_query("tap", query=parent_query_adql) 406 else: 407 children = QTable(**BASIC_TABLE) 408 raise e 409 410 return children["child"].data
411 412
[docs] 413def remove_child_stars(star_table: QTable, maximum_magnitude: float) -> QTable: 414 """Check the given table for parent-child pairs, and remove the children if 415 they exist. 416 417 Parameters 418 ---------- 419 star_table : astropy.table.QTable 420 Table to checked. 421 maximum_magnitude : float 422 Filtering value for the children. 423 424 Returns 425 ------- 426 astropy.table.QTable 427 Table with only parent items. 428 """ 429 parents = star_table["id"] # check all items, regardless of type 430 431 blocksize = 1000 # the maximum number of parents to query at once 432 all_children = [] 433 n_blocks = int(len(parents) / blocksize) 434 435 # query for the end of the parents list, leaving only a multiple of blocksize to query 436 all_children.append( 437 get_child_stars(tuple(parents.data[n_blocks * blocksize :]), maximum_magnitude) 438 ) 439 # query for the remainder of the parents in blocksize chunks 440 for i in range(n_blocks): 441 all_children.append( 442 get_child_stars( 443 tuple(parents.data[i * blocksize : (i + 1) * blocksize]), 444 maximum_magnitude, 445 ) 446 ) 447 448 child_items = np.concatenate(all_children) # type: ignore[arg-type,var-annotated] 449 450 # search for and remove children 451 star_table.add_index("id") 452 for child_id in child_items: 453 if child_id in star_table["id"]: 454 star_table.remove_rows(star_table.loc_indices[child_id]) 455 star_table.remove_indices("id") 456 457 return star_table
458 459
[docs] 460def get_single_spectral_type( 461 spectral_type: str, 462 acceptable_types: Collection[str], 463) -> str: 464 """Process a SIMBAD spectral type into one of `acceptable_types` or 465 `FALLBACK_SPECTRAL_TYPE`. 466 467 Parameters 468 ---------- 469 spectral_type : str 470 The spectral type as given by SIMBAD. 471 acceptable_types : collections.abc.Collection[str] 472 Collection of acceptable spectral types. 473 474 Returns 475 ------- 476 str 477 The best match to `acceptable_types` or `FALLBACK_SPECTRAL_TYPE`. 478 """ 479 if len(spectral_type) == 0: 480 return FALLBACK_SPECTRAL_TYPE 481 482 # start with most specific type, and work backwards 483 for i in range(len(spectral_type), 0, -1): 484 if spectral_type[:i] in acceptable_types: 485 return spectral_type[:i] 486 487 return FALLBACK_SPECTRAL_TYPE
488 489
[docs] 490def simplify_spectral_types( 491 star_table: QTable, 492 acceptable_types: Collection[str], 493) -> QTable: 494 """Replaces the `spectral_type` column of a table with simplified versions 495 via `get_single_spectral_type`. 496 497 Parameters 498 ---------- 499 star_table : astropy.table.QTable 500 Table with a `spectral_type` column. 501 acceptable_types : collections.abc.Collection[str] 502 Collection of acceptable spectral types. 503 504 Returns 505 ------- 506 astropy.table.QTable 507 `star_table` with a replaced `spectral_type` column. 508 """ 509 old_spectral_types = star_table["spectral_type"].data 510 star_table["spectral_type"] = [ 511 get_single_spectral_type(st, acceptable_types) for st in old_spectral_types 512 ] 513 return star_table
514 515
[docs] 516def clean_simbad_table_columns(table: QTable) -> QTable: 517 """Remove and rename some SIMBAD columns - does not fail if the columns do 518 not exist (happens if the table was actually generated from `BASIC_TABLE` and not by 519 an actual query). 520 521 Parameters 522 ---------- 523 table : astropy.table.QTable 524 Table to operate on. 525 526 Returns 527 ------- 528 astropy.table.QTable 529 "Cleaned" table. 530 """ 531 columns_to_remove = [ 532 "coo_err_min", 533 "coo_err_angle", 534 "coo_wavelength", 535 "coo_bibcode", 536 "coo_err_maj", 537 "otype", 538 ] 539 for colname in columns_to_remove: 540 if colname in table.colnames: 541 table.remove_column(colname) 542 543 renames = { 544 "main_id": "id", 545 "V": "magnitude", 546 "sp_type": "spectral_type", 547 } 548 549 for old, new in renames.items(): 550 if old in table.colnames: 551 table.rename_column(old, new) 552 553 return table