Source code for improver.memprofile
# (C) Crown Copyright, Met Office. All rights reserved.
#
# This file is part of 'IMPROVER' and is released under the BSD 3-Clause license.
# See LICENSE in the root of the repository for full licensing details.
"""Module containing maximum memory profiling utilities."""
import sys
import time
import tracemalloc
from datetime import datetime
from queue import Queue
from resource import RUSAGE_SELF, getrusage
from threading import Thread
from typing import Callable, Tuple
[docs]
def memory_profile_start(outfile_prefix: str) -> Tuple[Thread, Queue]:
"""Starts the memory tracking profiler.
Args:
outfile_prefix:
Prefix for the generated output. 2 files will
be generated: \\*_SNAPSHOT and \\*_MAX_TRACKER.
Returns:
- Active Thread tracking the memory.
- Active Queue for communication to the thread.
"""
queue = Queue()
thread = Thread(target=memory_monitor, args=(queue, outfile_prefix))
thread.start()
return thread, queue
[docs]
def memory_profile_end(queue: Queue, thread: Thread) -> None:
"""Ends the memory tracking profiler.
Args:
queue:
Active queue instance to communicate with active thread.
thread:
Active thread instance running memory tracking.
"""
queue.put("Stop")
thread.join()
[docs]
def memory_monitor(queue: Queue, outfile_prefix: str) -> None:
"""Function to track memory usage, should be run in a separate
thread to the main program.
Samples max_rss every 0.1s, if the max_rss is higher than the
previous max_rss, then creates a tracemalloc snapshot. There is a
performance overhead when using this.
Args:
queue:
Active queue instance to communicate with the thread.
outfile_prefix:
Prefix for the generated output. 2 files will
be generated: \\*_SNAPSHOT and \\*_MAX_TRACKER.
"""
tracemalloc.start()
old_max = 0
snapshot = None
wait_time = 0.1
fout = open("{}_MAX_TRACKER".format(outfile_prefix), "w")
b2mb = 1 / 1048576
if sys.platform == "linux":
# linux outputs max_rss in KB not B
b2mb = 1 / 1024
while True:
if queue.empty():
time.sleep(wait_time)
max_rss = getrusage(RUSAGE_SELF).ru_maxrss
if max_rss > old_max:
snapshot = tracemalloc.take_snapshot()
line = "{} max RSS {:.2f} MiB".format(datetime.now(), max_rss * b2mb)
print(line, file=fout)
old_max = max_rss
else:
snapshot.dump("{}_SNAPSHOT".format(outfile_prefix))
fout.close()
tracemalloc.stop()
return
[docs]
def memory_profile_decorator(func: Callable, outfile_prefix: str) -> Callable:
"""A decorator for convenience of running.
Args:
func:
function to track the maximum memory of.
outfile_prefix:
Prefix for the generated output. 2 files will
be generated: \\*_SNAPSHOT and \\*_MAX_TRACKER.
Returns:
The wrapper
"""
def wrapper(*args, **kwargs):
thread, queue = memory_profile_start(outfile_prefix)
results = func(*args, **kwargs)
memory_profile_end(queue, thread)
return results
return wrapper