1"""Plotting module for SkySim. Combines a `numpy` array of RGB values from
2:doc:`skysim.populate <populate>` with the `~skysim.settings.PlotSettings` plotting
3configuration.
4"""
5
6# License: GPLv3+ (see COPYING); Copyright (C) 2025 Tai Withers
7
8import subprocess
9from collections.abc import Collection
10from pathlib import Path
11
12import numpy as np
13from astropy.visualization.wcsaxes.frame import EllipticalFrame
14from astropy.wcs import WCS
15from matplotlib import pyplot as plt
16from matplotlib.axes import Axes
17
18from skysim.settings import PlotSettings
19from skysim.utils import TEMPFILE_SUFFIX, FloatArray, get_tempfile_path
20
21# Methods
22
23
24## Top-Level Plot Method
25
26
[docs]
27def create_plot(
28 plot_settings: PlotSettings, image_matrix: FloatArray, verbose_level: int
29) -> None:
30 """Main function for this module, creates image files from `image_matrix`.
31
32 Parameters
33 ----------
34 plot_settings : PlotSettings
35 Configuration.
36 image_matrix : FloatArray
37 RGB images.
38 verbose_level : int
39 How much detail to print.
40 """
41 if plot_settings.frames == 1:
42 create_single_plot(plot_settings, image_matrix, verbose_level)
43
44 else:
45 create_multi_plot(plot_settings, image_matrix, verbose_level)
46
47 return
48
49
50## Secondary Plot Methods
51
52
[docs]
53def create_single_plot(
54 plot_settings: PlotSettings, image_matrix: FloatArray, verbose_level: int
55) -> None:
56 """Plotting function for a still image.
57
58 Parameters
59 ----------
60 plot_settings : PlotSettings
61 Configuration.
62 image_matrix : FloatArray
63 Single frame RGB image.
64 verbose_level : int
65 How much detail to print.
66 """
67 save_frame(0, plot_settings, image_matrix[0], plot_settings.filename)
68 if verbose_level > 0:
69 print(f"{plot_settings.filename} saved.")
70 return
71
72
[docs]
73def create_multi_plot(
74 plot_settings: PlotSettings, image_matrix: FloatArray, verbose_level: int
75) -> None:
76 """Plotting function for creating a video.
77
78 Parameters
79 ----------
80 plot_settings : PlotSettings
81 Configuration object, passed to `save_frame`.
82 image_matrix : FloatArray
83 Multi-frame RGB image.
84 verbose_level : int
85 How much detail to print.
86 """
87
88 # create the temporary directory
89 if not plot_settings.tempfile_path.is_dir():
90 try:
91 plot_settings.tempfile_path.mkdir()
92 except PermissionError as e:
93 raise ValueError(
94 "Permission denied creating temporary directory for storing "
95 f"intermediate data products ({plot_settings.tempfile_path}). "
96 "Choose a different path for the output file."
97 ) from e
98
99 # create all the frames
100 results = []
101 for i in range(plot_settings.frames):
102 tempfile_path = get_tempfile_path(plot_settings, i)
103 results.append(save_frame(i, plot_settings, image_matrix[i], tempfile_path))
104 if verbose_level > 1:
105 print(f"{tempfile_path} saved.")
106
107 # convert frames into a movie
108 ffmpeg_call = construct_ffmpeg_call(plot_settings)
109 if verbose_level > 1:
110 print(f"Running ffmpeg with `{ffmpeg_call}`")
111 ffmpeg_return_code = run_ffmpeg(ffmpeg_call)
112 if ffmpeg_return_code == 0:
113 if verbose_level > 0:
114 print(f"{plot_settings.filename} saved.")
115
116 movie_cleanup([i[1] for i in results], plot_settings.tempfile_path, verbose_level)
117
118 return
119
120
121## Generic Helper Methods
122
123
[docs]
124def save_frame(
125 index: int, plot_settings: PlotSettings, frame: FloatArray, filename: Path
126) -> tuple[int, Path]:
127 """Create and save a figure for a single frame.
128
129 Parameters
130 ----------
131 index : int
132 Index of the frame.
133 plot_settings : PlotSettings
134 Configuration object. Attributes accessed are `figure_size`,
135 `wcs_objects`, `observation_info`, `datetime_strings`, and `dpi`.
136 frame : FloatArray
137 RGB image.
138 filename : pathlib.Path
139 Location to save the image.
140
141 Returns
142 -------
143 tuple[int, str]
144 Index and filename.
145 """
146 fig, ax = plt.subplots(
147 figsize=plot_settings.figure_size,
148 subplot_kw={
149 "frame_on": False,
150 "projection": plot_settings.wcs_objects[index],
151 "frame_class": EllipticalFrame,
152 },
153 )
154
155 ax.set(xticks=[], yticks=[])
156 fig.suptitle(plot_settings.observation_info)
157 display_frame(
158 ax,
159 plot_settings.wcs_objects[index],
160 frame,
161 plot_settings.datetime_strings[index],
162 )
163
164 try:
165 plt.savefig(
166 filename,
167 dpi=plot_settings.dpi,
168 bbox_inches="tight",
169 )
170 except PermissionError as e:
171 raise ValueError(
172 f"Permission denied saving image ({filename}). "
173 "Choose a different path for the output file."
174 ) from e
175 plt.close()
176
177 return (index, filename)
178
179
[docs]
180def display_frame(ax: Axes, wcs: WCS, frame: FloatArray, frame_title: str) -> Axes:
181 """Display a single frame on a matplotlib axes.
182
183 Parameters
184 ----------
185 ax : matplotlib.axes.Axes
186 Axes to use.
187 wcs : astropy.wcs.WCS
188 Coordinates of the new frame.
189 frame : FloatArray
190 RGB image.
191 frame_title : str
192 Title of the frame.
193
194 Returns
195 -------
196 matplotlib.axes.Axes
197 Updated axes.
198 """
199 ax.reset_wcs(wcs)
200
201 ax.imshow(frame, origin="lower")
202
203 # axis labels
204 for axis in ax.coords:
205 axis.set_auto_axislabel(False)
206 axis.set_ticks_visible(False)
207 axis.set_ticklabel_visible(False)
208
209 # frame
210 ax.coords.frame.set_linewidth(0)
211
212 ax.set_title(frame_title, backgroundcolor="w", pad=8)
213
214 return ax
215
216
217## Movie-Specific Helper Methods
218
219
[docs]
220def construct_ffmpeg_call(plot_settings: PlotSettings) -> str:
221 """Construct the command to call ffmpeg with. Note that the command is not
222 actually run.
223 Note that the ffmpeg flag `-pix_fmt yuv420p` is required in order for most players,
224 and yuv420p also requires that the pixel dimensions of the movie be divisible by 2.
225
226 Parameters
227 ----------
228 plot_settings : PlotSettings
229 Configuration. Attributes accessed are `figure_size`, `dpi`, `fps`,
230 `tempfile_path`, `tempfile_zfill`, and `filename`.
231
232 Returns
233 -------
234 str
235 Command to run.
236 """
237
238 # create a filter to make the output have pixel dimensions divisible by 2
239 output_pixels = np.ceil(max(plot_settings.figure_size) * plot_settings.dpi).astype(
240 int
241 )
242 if output_pixels % 2 != 0:
243 output_pixels += 1
244 output_filter = (
245 "-filter_complex "
246 '"'
247 f"scale={output_pixels}:{output_pixels}:force_original_aspect_ratio=decrease,"
248 f"pad={output_pixels}:{output_pixels}:(ow-iw)/2:(oh-ih)/2"
249 '"'
250 )
251
252 global_options = "-loglevel warning -hide_banner"
253 input_options = f"-framerate {plot_settings.fps}"
254 input_files = f"{plot_settings.tempfile_path}/%0{plot_settings.tempfile_zfill}d.png"
255 output_options = (
256 f"-y -r {plot_settings.fps} -codec:v libx264 {output_filter} -pix_fmt yuv420p"
257 )
258 return (
259 f"ffmpeg {global_options} {input_options} -i {input_files} {output_options}"
260 f" {plot_settings.filename}"
261 )
262
263
[docs]
264def run_ffmpeg(ffmpeg_call: str) -> int:
265 """Run FFmpeg to convert the set of images into a video.
266
267 Parameters
268 ----------
269 ffmpeg_call : str
270 The shell command to run FFmpeg.
271
272 Returns
273 -------
274 int
275 FFmpeg return code.
276
277 Raises
278 ------
279 ValueError
280 Raised if FFmpeg returns a non-zero exit code.
281 """
282 ffmpeg_out = subprocess.run(
283 ffmpeg_call,
284 shell=True, # run as shell command
285 capture_output=True, # adds stderr and stdout attributes
286 text=True, # interpret stderr and stdout as text
287 check=False, # don't raise exception on non-zero exit code
288 )
289 if ffmpeg_out.returncode != 0:
290 raise ValueError(
291 "Something went wrong compiling the frames into a video. "
292 f"FFmpeg error: {ffmpeg_out.stderr}"
293 )
294
295 return ffmpeg_out.returncode
296
297
[docs]
298def movie_cleanup(
299 filenames: Collection[Path], directory: Path, verbose_level: int
300) -> None:
301 """Clean up the tempfiles used in creating a video.
302
303 Parameters
304 ----------
305 filenames : collections.abc.Collection[pathlib.Path]
306 The image files to delete.
307 directory : pathlib.Path
308 The directory to delete.
309 verbose_level : int
310 How much detail to print.
311
312 Raises
313 ------
314 ValueError
315 Raised if the directory cannot be deleted.
316 """
317 # remove all the tempfiles
318 for path in filenames:
319 if path.suffix == TEMPFILE_SUFFIX:
320 path.unlink()
321 if verbose_level > 1:
322 print(f"{path} removed.")
323
324 # remove the tempfile directory
325 try:
326 if verbose_level > 1:
327 print(f"{directory} removed.")
328 directory.rmdir()
329 except OSError as e:
330 raise ValueError(
331 f"Can't remove temporary directory {directory}. {e.strerror}"
332 ) from e