1"""Query Simbad & Astropy to get information about celestial objects, using an
2`~skysim.settings.ImageSettings` object to filter the data.
3"""
4
5# License: GPLv3+ (see COPYING); Copyright (C) 2025 Tai Withers
6
7import warnings
8from collections.abc import Collection
9from typing import Any
10
11import numpy as np
12from astropy import units as u
13from astropy.coordinates import EarthLocation, SkyCoord, get_body
14from astropy.table import QTable, Table, unique
15from astropy.time import Time
16from astroquery.exceptions import NoResultsWarning
17from astroquery.simbad import Simbad
18from pydantic import PositiveFloat
19from pyvo.dal.exceptions import DALQueryError
20
21from skysim.colours import RGBTuple
22from skysim.utils import round_columns
23
24# Constants
25
26
27SOLARSYSTEM_BODIES = Table(
28 {
29 "name": [
30 "mercury",
31 "venus",
32 "mars",
33 "jupiter",
34 "saturn",
35 "uranus",
36 "neptune",
37 ],
38 "magnitude.offset": [
39 -0.613,
40 -4.384,
41 -1.601,
42 -9.395,
43 -8.914,
44 -7.11,
45 -7,
46 ],
47 }
48)
49"""Dictionary of bodies (planets) and base magnitudes (used in
50`get_planet_magnitude`)
51"""
52
53BASIC_TABLE = {
54 "names": [
55 "id",
56 "ra",
57 "dec",
58 "magnitude",
59 "spectral_type",
60 ],
61 "dtype": [str, float, float, float, str],
62 "units": [None, "deg", "deg", None, None],
63}
64"""Basic table structure used for celestial objects."""
65
66
67FALLBACK_SPECTRAL_TYPE = "fallback"
68"""The spectral type to assign to an object that doesn't have one. """
69
70
71# Methods
72
73
74## Top-Level Query Methods
75
76
[docs]
77def get_body_locations(
78 observation_times: Time, earth_location: EarthLocation
79) -> dict[str, SkyCoord]:
80 """Get ephemeris for the sun/earth/planets.
81
82 Parameters
83 ----------
84 observation_times : astropy.time.Time
85 Times to check for.
86 earth_location : astropy.coordinates.EarthLocation
87 Viewing location.
88
89 Returns
90 -------
91 dict[str, astropy.coordinates.SkyCoord]
92 Locations for sun, earth, planets as dictionary.
93 """
94
95 all_bodies = list(SOLARSYSTEM_BODIES["name"].data) + ["sun", "earth"]
96
97 return {
98 name: get_body(name, observation_times, location=earth_location)
99 for name in all_bodies
100 }
101
102
[docs]
103def get_planet_table(
104 body_locations: dict[str, SkyCoord],
105) -> list[QTable]:
106 """
107 Get the planetary information at each observing time.
108
109 Parameters
110 ----------
111 body_locations : dict[str, astropy.coordinates.SkyCoord]
112 Dictionary of sun and planet locations for each observation time
113 (includes earth).
114
115 Returns
116 -------
117 list[astropy.table.QTable]
118 QTable of planetary info for each observing time.
119 """
120 sun_locations = body_locations.pop("sun")
121 earth_locations = body_locations.pop("earth")
122 planet_table = []
123
124 # for each time step, use the sun and earth locations
125 for i, (sun, earth) in enumerate(zip(sun_locations, earth_locations)):
126 this_time = QTable(**BASIC_TABLE)
127
128 # add a row to the table for each planet
129 for name, mag_offset in SOLARSYSTEM_BODIES[["name", "magnitude.offset"]]:
130 body_location = body_locations[name][i]
131 sun_distance = body_location.separation_3d(sun).to(u.au).value
132 earth_distance = body_location.separation_3d(earth).to(u.au).value
133 row = {
134 "ra": body_location.ra,
135 "dec": body_location.dec,
136 "magnitude": get_planet_magnitude(
137 mag_offset, sun_distance, earth_distance
138 ),
139 "spectral_type": name,
140 "id": name,
141 }
142 this_time.add_row(row)
143
144 this_time = round_columns(this_time)
145 planet_table.append(this_time)
146
147 return planet_table
148
149
[docs]
150def get_star_table(
151 observation_radec: SkyCoord,
152 field_of_view: u.Quantity["angle"], # type: ignore[type-arg, name-defined]
153 maximum_magnitude: float,
154 object_colours: dict[str, RGBTuple],
155 verbose_level: int = 1,
156) -> QTable:
157 """
158 Query Simbad to get celestial objects.
159
160 Parameters
161 ----------
162 observation_radec : astropy.coordinates.SkyCoord
163 RA, Dec coordinates that get observed.
164 field_of_view : astropy.units.Quantity[angle]
165 Diameter of observation.
166 maximum_magnitude : float
167 Highest magnitude value to search for.
168 object_colours : dict[str, RGBTuple]
169 Colours of the objects - used to check if spectral types are valid.
170 verbose_level : int
171 How much detail to print.
172
173 Returns
174 -------
175 astropy.table.QTable
176 Table of all valid celestial objects.
177 """
178 query_result = run_simbad_query(
179 "region",
180 extra_columns=["otype", "V", "ids", "sp_type"],
181 coordinates=observation_radec,
182 radius=field_of_view / 2,
183 criteria=f"otype != 'err' AND V < {maximum_magnitude}",
184 )
185 # remove/rename columns
186 query_result = clean_simbad_table_columns(query_result)
187
188 # remove the "ids" column & repurposes the "id" column
189 query_result = get_star_name_column(query_result)
190
191 if len(query_result) == 0:
192 if verbose_level > 1:
193 print("Query to SIMBAD resulted in no objects.")
194 return query_result
195
196 # spectral types
197 spectral_types = get_spectral_types(object_colours)
198 query_result = simplify_spectral_types(query_result, spectral_types)
199
200 # remove child elements
201 query_result = remove_child_stars(query_result, maximum_magnitude)
202
203 # final general cleanup
204 query_result = round_columns(query_result)
205 query_result = unique(query_result)
206
207 if verbose_level > 1:
208 print(f"Query to SIMBAD resulted in {len(query_result)} objects.")
209 return query_result
210
211
212## Helper Methods
213
214
[docs]
215def get_spectral_types(object_colours: dict[str, RGBTuple]) -> list[str]:
216 """Convert the user-input object colours dictionary to a list of valid
217 spectral types.
218
219 Parameters
220 ----------
221 object_colours : dict[str, RGBTuple]
222 User input.
223
224 Returns
225 -------
226 list[str]
227 Acceptable spectral types.
228 """
229 spectral_types = [
230 i for i in object_colours.keys() if i not in SOLARSYSTEM_BODIES["name"]
231 ]
232 return [i for i in spectral_types if i != FALLBACK_SPECTRAL_TYPE]
233
234
[docs]
235def run_simbad_query(
236 query_type: str, extra_columns: Collection[str] = (), **kwargs: Any
237) -> QTable:
238 """Query SIMBAD with either a region or TAP request.
239
240 Parameters
241 ----------
242 query_type : str
243 "region" or "tap" - the type of request to send.
244 extra_columns : collections.abc.Collection[str], default `()`
245 Extra columns to add to SIMBAD outputs. Only valid for "region" query
246 type.
247 **kwargs : collections.abc.Mapping
248 Unpacked and passed to the query function.
249
250 Returns
251 -------
252 astropy.table.QTable
253 Simbad result.
254
255 Raises
256 ------
257 ValueError
258 Raised if `query_type` is not "region" or "tap".
259 """
260
261 result = QTable(**BASIC_TABLE)
262 with warnings.catch_warnings(action="ignore", category=NoResultsWarning):
263 if query_type == "region":
264 Simbad.add_votable_fields(*extra_columns)
265 result = QTable(Simbad.query_region(**kwargs))
266 elif query_type == "tap":
267 if len(extra_columns) > 0:
268 raise ValueError( # TODO: add test for this error
269 f"{extra_columns=} was passed to run_simbad_query, but "
270 f"{query_type=} which doesn't support that."
271 )
272 result = QTable(Simbad.query_tap(**kwargs))
273 else:
274 raise ValueError( # TODO: add test for this error
275 f'{query_type=} is invalid, should be one of ["region","tap"].'
276 )
277 Simbad.reset_votable_fields()
278 return result
279
280
[docs]
281def get_planet_magnitude(
282 base_magnitude: float,
283 distance_to_sun: PositiveFloat,
284 distance_to_earth: PositiveFloat,
285) -> float:
286 """Calculate the magnitude for a planet.
287
288 Parameters
289 ----------
290 base_magnitude : float
291 Static base magnitude for the planet.
292 distance_to_sun : pydantic.PositiveFloat
293 Distance (in au) from the planet to the sun.
294 distance_to_earth : pydantic.PositiveFloat
295 Distance (in au) from the planet to the Earth.
296
297 Returns
298 -------
299 float
300 The magnitude.
301 """
302 return (
303 5 * np.log10(distance_to_sun * distance_to_earth).astype(float) + base_magnitude
304 )
305
306
[docs]
307def get_star_name_column(star_table: QTable) -> QTable:
308 """Create human-readable `name` column.
309
310 Parameters
311 ----------
312 star_table : astropy.table.QTable
313 Table of stars from which to generate the name column from the `ids`
314 column.
315
316 Returns
317 -------
318 astropy.table.QTable
319 `star_table` with the ``id`` column replaced by a human-readable `name`
320 column, and the `ids` column removed.
321 """
322
323 star_table["ids_list"] = [i.split("|") for i in star_table["ids"]]
324 names_column = []
325 for simbad_id, namelist in zip(star_table["id"].data, star_table["ids_list"].data):
326 # strip "NAME " from entry
327 item_names = [n[5:] for n in namelist if "NAME" in n]
328
329 # store a human readable name of some form based on what's available
330 if len(item_names) == 0:
331 names_column.append(simbad_id)
332 elif len(item_names) == 1:
333 names_column.append(item_names[0])
334 else:
335 names_column.append("/".join(item_names))
336
337 star_table.replace_column("id", names_column)
338 star_table.remove_columns(["ids", "ids_list"])
339
340 return star_table
341
342
[docs]
343def get_child_stars(
344 parent_stars: tuple[str], maximum_magnitude: float
345) -> Collection[str]:
346 """Query SIMBAD for any child objects of `parent_stars`.
347
348 Parameters
349 ----------
350 parent_stars : tuple[str]
351 SIMAD ids.
352 maximum_magnitude : float
353 Filtering value for the children.
354
355 Returns
356 -------
357 collections.abc.Collection[str]
358 Child ids.
359 """
360
361 parent_stars_list = []
362 for star in parent_stars:
363 if "/" in star:
364 star = star[: star.index("/")]
365 if "'" in star:
366 star = star.replace("'", "''")
367 parent_stars_list.append(star)
368
369 # str(tuple-with-one-element) adds a trailing comma, which trips up SIMBAD,
370 # so we do the string conversion ourself in this case
371 if len(parent_stars_list) == 1:
372 parent_stars_string = f"('{parent_stars_list[0]}')"
373 else:
374 parent_stars_string = f"{tuple(str(i) for i in parent_stars_list)}"
375
376 # write the query
377 parent_query_adql = f"""
378 SELECT main_id as "child", allfluxes.V
379 FROM h_link
380 JOIN ident as p on p.oidref=parent
381 JOIN basic on oid="child"
382 JOIN allfluxes on oid = allfluxes.oidref
383 WHERE p.id in {parent_stars_string}
384 AND V <= {maximum_magnitude};
385 """
386
387 try:
388 children = run_simbad_query("tap", query=parent_query_adql)
389 except DALQueryError as e: # TODO: find out how to force this error
390 # Sometimes one of the parent ids is...problematic
391 # if that's the case, just boot it out
392 if "1 unresolved identifiers" in e.reason:
393 bad_region = e.reason[e.reason.index("[") + 1 : e.reason.index("]")]
394 lineno = int(bad_region[2 : bad_region.index(" ")])
395 get_col = lambda colstring: int(colstring[colstring.index(" ") + 3 :])
396 col_start, col_end = [get_col(a) for a in bad_region.split(" - ")]
397
398 query_split = parent_query_adql.splitlines()
399 query_split[lineno - 1] = (
400 query_split[lineno - 1][: col_start - 1]
401 + query_split[lineno - 1][col_end + 1 :]
402 )
403
404 parent_query_adql = "\n".join(query_split)
405 children = run_simbad_query("tap", query=parent_query_adql)
406 else:
407 children = QTable(**BASIC_TABLE)
408 raise e
409
410 return children["child"].data
411
412
[docs]
413def remove_child_stars(star_table: QTable, maximum_magnitude: float) -> QTable:
414 """Check the given table for parent-child pairs, and remove the children if
415 they exist.
416
417 Parameters
418 ----------
419 star_table : astropy.table.QTable
420 Table to checked.
421 maximum_magnitude : float
422 Filtering value for the children.
423
424 Returns
425 -------
426 astropy.table.QTable
427 Table with only parent items.
428 """
429 parents = star_table["id"] # check all items, regardless of type
430
431 blocksize = 1000 # the maximum number of parents to query at once
432 all_children = []
433 n_blocks = int(len(parents) / blocksize)
434
435 # query for the end of the parents list, leaving only a multiple of blocksize to query
436 all_children.append(
437 get_child_stars(tuple(parents.data[n_blocks * blocksize :]), maximum_magnitude)
438 )
439 # query for the remainder of the parents in blocksize chunks
440 for i in range(n_blocks):
441 all_children.append(
442 get_child_stars(
443 tuple(parents.data[i * blocksize : (i + 1) * blocksize]),
444 maximum_magnitude,
445 )
446 )
447
448 child_items = np.concatenate(all_children) # type: ignore[arg-type,var-annotated]
449
450 # search for and remove children
451 star_table.add_index("id")
452 for child_id in child_items:
453 if child_id in star_table["id"]:
454 star_table.remove_rows(star_table.loc_indices[child_id])
455 star_table.remove_indices("id")
456
457 return star_table
458
459
[docs]
460def get_single_spectral_type(
461 spectral_type: str,
462 acceptable_types: Collection[str],
463) -> str:
464 """Process a SIMBAD spectral type into one of `acceptable_types` or
465 `FALLBACK_SPECTRAL_TYPE`.
466
467 Parameters
468 ----------
469 spectral_type : str
470 The spectral type as given by SIMBAD.
471 acceptable_types : collections.abc.Collection[str]
472 Collection of acceptable spectral types.
473
474 Returns
475 -------
476 str
477 The best match to `acceptable_types` or `FALLBACK_SPECTRAL_TYPE`.
478 """
479 if len(spectral_type) == 0:
480 return FALLBACK_SPECTRAL_TYPE
481
482 # start with most specific type, and work backwards
483 for i in range(len(spectral_type), 0, -1):
484 if spectral_type[:i] in acceptable_types:
485 return spectral_type[:i]
486
487 return FALLBACK_SPECTRAL_TYPE
488
489
[docs]
490def simplify_spectral_types(
491 star_table: QTable,
492 acceptable_types: Collection[str],
493) -> QTable:
494 """Replaces the `spectral_type` column of a table with simplified versions
495 via `get_single_spectral_type`.
496
497 Parameters
498 ----------
499 star_table : astropy.table.QTable
500 Table with a `spectral_type` column.
501 acceptable_types : collections.abc.Collection[str]
502 Collection of acceptable spectral types.
503
504 Returns
505 -------
506 astropy.table.QTable
507 `star_table` with a replaced `spectral_type` column.
508 """
509 old_spectral_types = star_table["spectral_type"].data
510 star_table["spectral_type"] = [
511 get_single_spectral_type(st, acceptable_types) for st in old_spectral_types
512 ]
513 return star_table
514
515
[docs]
516def clean_simbad_table_columns(table: QTable) -> QTable:
517 """Remove and rename some SIMBAD columns - does not fail if the columns do
518 not exist (happens if the table was actually generated from `BASIC_TABLE` and not by
519 an actual query).
520
521 Parameters
522 ----------
523 table : astropy.table.QTable
524 Table to operate on.
525
526 Returns
527 -------
528 astropy.table.QTable
529 "Cleaned" table.
530 """
531 columns_to_remove = [
532 "coo_err_min",
533 "coo_err_angle",
534 "coo_wavelength",
535 "coo_bibcode",
536 "coo_err_maj",
537 "otype",
538 ]
539 for colname in columns_to_remove:
540 if colname in table.colnames:
541 table.remove_column(colname)
542
543 renames = {
544 "main_id": "id",
545 "V": "magnitude",
546 "sp_type": "spectral_type",
547 }
548
549 for old, new in renames.items():
550 if old in table.colnames:
551 table.rename_column(old, new)
552
553 return table