Module src.Simulation

Allows user to run single simulations or whole scenarios with different custom settings, which then can be analysed with various postprocessing methods.

Expand source code
"""
Allows user to run single simulations or whole scenarios
with different custom settings, which then can be analysed
with various postprocessing methods.
"""

__all__ = ['Simulation', 'Scenarios', 'PostProcessing']

import json
import csv
import os
import shutil
from os.path import sep
import matplotlib.pyplot as plt
from matplotlib.collections import LineCollection
from matplotlib.colors import ListedColormap, BoundaryNorm
import numpy as np
from src.Network import Group, Population
from src.Utils import ProgressBar, Standalones


################################################################################################
################################################################################################
################################################################################################


class Simulation:
    def __init__(self, settings: dict):
        """
        Create a Simulation object with the given settings.

        Parameters
        ----------
        settings : dict
            The dictionary containing all relevant settings for
            the simulation.
        """

        self.settings = None
        self.change_settings(settings)

        self.population = Population.load_from_file(self.settings["population_file"])
        self._population_init = self.population.copy()
        self.groups = {"Infected": Group("Infected"),
                       "Recovered": Group("Recovered"),
                       "Vaccinated": Group("Vaccinated"),
                       "Dead": Group("Dead"),
                       "Quarantined": Group("Quarantined")}
        self.stats = {"new_infected": [0],
                      "new_recovered": [0],
                      "new_susceptible": [0],
                      "new_vaccinated": [0],
                      "new_dead": [0],
                      "ill_vaccinated": [0],
                      "test_results_-": [0],
                      "test_results_+": [0],
                      "seven_day_incidence": [0],
                      "in_lockdown": [0]
                      }
        self.arrange_lockdown = False
        self.lockdown_duration = 0
        self.lockdown_ended = -np.inf

    def start_iteration(self):
        """
        Starts the simulation and runs it until infection numbers
        drop to zero or time given in 'maximum_simulation_time_interval'
        is reached.
        """

        def initialize_groups():
            def put_inits_in_respective_group():
                gen_params = lambda: {
                    "incubation_period": -1,
                    "infection_period": np.random.poisson(c_infection),
                    "immunity_period": np.random.poisson(c_immunity)
                }

                for ini_inf in ini_infs:
                    ini_inf.infect(ini_inf, 0, gen_params())
                    self.groups["Infected"].add_member(ini_inf)

                for ini_rec in ini_recs:
                    ini_rec.make_immune(np.random.poisson(c_immunity))
                    self.groups["Recovered"].add_member(ini_rec)
                    ini_rec.recovered = True

                for ini_vac in ini_vacs:
                    ini_vac.make_immune(np.random.poisson(c_vac_immunity))
                    self.groups["Vaccinated"].add_member(ini_vac)
                    ini_vac.vaccinated = True

                for group in self.groups.values():
                    group.counter.save_count()
                    group.counter.squash_history()

            infs_recs_vacs = np.random.choice(self.population.members,
                                              size=n_ini_inf + n_ini_recs + n_ini_vacs,
                                              replace=False)

            ini_infs = infs_recs_vacs[:n_ini_inf]
            ini_recs = infs_recs_vacs[n_ini_inf:n_ini_inf + n_ini_recs]
            ini_vacs = infs_recs_vacs[n_ini_inf + n_ini_recs:]

            put_inits_in_respective_group()

        def simulate_group(group: Group):
            if group.name == "Infected":
                for member in group:
                    if member.make_tick("infectious"):
                        n_inner, n_outer = np.random.poisson(c_inner), np.random.poisson(c_outer)

                        if self.arrange_lockdown:
                            n_inner, n_outer = np.random.poisson(3/2 * c_inner), np.random.poisson(1/2 * c_outer)

                        if member.quarantined:
                            n_inner, n_outer = np.random.poisson(2 * c_inner), 0

                        gen_params = lambda: {
                            "infection_probability_heuristic": infection_heuristic,
                            "vaccine_failure_probability_heuristic": vaccine_heuristic,
                            "incubation_period": np.random.poisson(c_incubation),
                            "infection_period": np.random.poisson(c_infection),
                            "immunity_period": np.random.poisson(c_immunity),
                        }

                        household = self.population.households[member.properties["household"]]
                        new_members["newly_infected"] += household.spread_disease(member, n_inner, tick, gen_params())
                        new_members["newly_infected"] += self.population.spread_disease(member, n_outer, tick,
                                                                                        gen_params())

                        if member.make_tick("recover"):
                            new_members["newly_recovered"] += [member]
                            member.recovered = True

                        elif np.random.uniform() < mortality_heuristic(member.properties):
                            new_members["new_dead"] += [member]
                            member.make_dead(tick)

                    if member.vaccinated:
                        member.make_tick("vaccine")

                new_members["newly_infected_vac"] = [m for m in new_members["newly_infected"] if m.vaccinated]

            elif group.name == "Recovered":
                for member in group:
                    if member.make_tick("immunity"):
                        new_members["newly_susceptible_rec"] += [member]
                        new_members["newly_susceptible"] += [member]

            elif group.name == "Vaccinated":
                for member in group:
                    if member.make_tick("immunity"):
                        new_members["newly_susceptible_vac"] += [member]
                        new_members["newly_susceptible"] += [member]

                    elif not member.infected:
                        member.make_tick("vaccine")

            elif group.name == "Dead":
                pass

            elif group.name == "Quarantined":
                for member in group:
                    if member.make_tick("quarantine", tick):
                        group.remove_member(member)

            else:
                raise ValueError("Group '" + group.name + "' does not have an update function")

        def move_members_to_new_groups():
            for member in new_members["newly_susceptible_rec"]:
                self.groups["Recovered"].remove_member(member)
                member.recovered = False

            for member in new_members["newly_susceptible_vac"]:
                self.groups["Vaccinated"].remove_member(member)
                member.vaccinated = False

            Group.move(new_members["newly_recovered"], self.groups["Infected"], self.groups["Recovered"])

            Group.move(new_members["new_dead"], self.groups["Infected"], self.groups["Dead"])

            for member in new_members["new_dead"]:
                self.groups["Quarantined"].remove_member(member)

            for member in new_members["newly_infected"]:
                self.groups["Infected"].add_member(member)
                if member.vaccinated:
                    self.groups["Vaccinated"].remove_member(member)
                    member.vaccinated = False

        def simulate_vaccinations():
            for member in new_members["staged_vaccinated"]:
                gen_params = lambda: {
                    "t_vac_effect": np.random.poisson(c_vac_effect),
                    "t_immunity": np.random.poisson(c_vac_immunity),
                    "t_wait_vac": t_wait_vac,
                    "t_wait_rec": t_wait_rec
                }

                if member.vaccinate(tick, gen_params()):
                    if member.recovered:
                        Group.move([member], self.groups["Recovered"], self.groups["Vaccinated"])
                        member.recovered = False

                    elif not member.vaccinated:
                        self.groups["Vaccinated"].add_member(member)

                    member.vaccinated = True

                else:
                    new_members["not_vaccinated"] += [member]

        def simulate_tests():
            def test_and_quarantine_procedure(member):
                result = False
                if not member.quarantined and tick != member._last_tested:
                    if (not self.settings["test_vaccinated"]) and "vaccinations" in member.properties.keys() and \
                            tick < member.properties["vaccinations"][-1][0] + self.settings["vaccination_immunity_time"]:
                        return False

                    result = member.test(tick)
                    results[result] += 1
                    if result:
                        member.quarantine(self.settings["quarantine_duration"])
                        self.groups["Quarantined"].add_member(member)

                return result

            def backtrack(member, depth):
                if depth <= 0 or np.random.uniform() > self.settings["backtracking_probability"]:
                    return

                for contact in member.recent_contacts:
                    if test_and_quarantine_procedure(contact):
                        backtrack(contact, depth - 1)

            n_tests = min(np.random.poisson(c_tests), self.population.size)
            results = [0, 0]
            for member in np.random.choice(self.population.members, size=n_tests, replace=False):
                if test_and_quarantine_procedure(member):
                    backtrack(member, self.settings["backtracking_depth"])

            self.stats["test_results_-"] += [results[0]]
            self.stats["test_results_+"] += [results[1]]

        def decide_measure(measure: str):
            if measure == "lockdown":
                # Would a lockdown be arranged/prolonged?
                if self.settings["start_lockdown_at"] <= self.stats["seven_day_incidence"][-1]:
                    result = True

                elif self.settings["end_lockdown_at"] >= self.stats["seven_day_incidence"][-1]:
                    result = False

                else:
                    result = self.arrange_lockdown

                # If it is (not) prolonged, is it still in the allowed duration interval?
                if result:
                    if self.lockdown_duration >= self.settings["maximum_lockdown_duration"]:
                        self.lockdown_duration = 0
                        self.lockdown_ended = tick
                        result = False

                else:
                    if self.arrange_lockdown and self.lockdown_duration < self.settings["minimum_lockdown_duration"]:
                        result = True

                    else:
                        if self.arrange_lockdown:
                            self.lockdown_ended = tick

                        self.lockdown_duration = 0

                # If a new lockdown is arranged, is it sufficiently spaced from the last?
                if result and not self.arrange_lockdown:
                    result = tick - self.lockdown_ended > self.settings["lockdown_gap"]

                # Are enough people vaccinated?
                if self.groups["Vaccinated"].size >= self.settings["heard_immunity"] * self.population.size:
                    result = self.arrange_lockdown

                self.lockdown_duration += result

                return result

            else:
                raise ValueError("Measure not available")

        def update_stats():
            def calc_7di():
                positive = self.stats["test_results_+"]
                if len(positive) >= 7:
                    return round(sum(positive[-7:]) * 100000 / self.population.size)
                else:
                    return round(sum(positive) * 7 / len(positive) * 100000 / self.population.size)

            self.stats["new_infected"] += [len(new_members["newly_infected"])]
            self.stats["new_recovered"] += [len(new_members["newly_recovered"])]
            self.stats["new_susceptible"] += [len(new_members["newly_susceptible"])]
            self.stats["new_vaccinated"] += [n_vacs - len(new_members["not_vaccinated"])]
            self.stats["new_dead"] += [len(new_members["new_dead"])]
            self.stats["ill_vaccinated"] += [len(new_members["newly_infected_vac"])]
            self.stats["seven_day_incidence"] += [calc_7di()]
            self.stats["in_lockdown"] += [1 if self.arrange_lockdown else 0]

        def print_stats():
            color = bcolors.FAIL if self.arrange_lockdown else bcolors.OKGREEN
            print(color + "\rDay: %04d, #Infected: %d, #Vaccinated: %d, #Quarantined: %d, #Dead: %d, 7di: %d\033[K"
                  % (tick,
                     self.groups["Infected"].size,
                     self.groups["Vaccinated"].size,
                     self.groups["Quarantined"].size,
                     self.groups["Dead"].size,
                     self.stats["seven_day_incidence"][-1]), end="")

        print("\nInitializing simulation...")

        # c -> put into poisson, n -> fixed value
        tick = 0
        infection_heuristic = self.settings["infection_probability_heuristic"]
        mortality_heuristic = self.settings["mortality_probability_heuristic"]
        vaccine_heuristic = self.settings["vaccine_failure_probability_heuristic"]
        c_inner = self.settings["inner_reproduction_number"]
        c_outer = self.settings["outer_reproduction_number"]
        n_ini_inf = self.settings["number_of_initially_infected"]
        n_ini_recs = self.settings["number_of_initially_recovered"]
        n_ini_vacs = self.settings["number_of_initially_vaccinated"]
        c_incubation = self.settings["incubation_time"]
        c_infection = self.settings["infection_time"]
        c_immunity = self.settings["recovered_immunity_time"]
        c_vac_effect = self.settings["vaccination_takes_effect_time"]
        c_vac_immunity = self.settings["vaccination_immunity_time"]
        c_vacs = self.settings["vaccinations_per_day"]
        c_tests = self.settings["tests_per_day"]
        t_wait_vac = self.settings["waiting_time_vaccinated_until_new_vaccination"]
        t_wait_rec = self.settings["waiting_time_recovered_until_vaccination"]
        t_vac_available = self.settings["vaccine_available_as_of"]
        max_t = self.settings["maximum_simulation_time_interval"]

        initialize_groups()

        print("Finished initializing simulation.\n\nStarting simulation...")

        print_stats()

        while True:
            tick += 1
            n_vacs = min(np.random.poisson(c_vacs), self.population.size)
            n_vacs = round(n_vacs * np.exp(-100 / (tick - t_vac_available)) if tick > t_vac_available else 0)

            self.arrange_lockdown = decide_measure("lockdown")

            new_members = {
                "newly_susceptible": [],
                "newly_infected": [],
                "newly_infected_vac": [],
                "newly_recovered": [],
                "newly_susceptible_rec": [],
                "newly_susceptible_vac": [],
                "staged_vaccinated": np.random.choice(self.population.members, size=n_vacs, replace=False),
                "not_vaccinated": [],
                "new_dead": []
            }

            for group in self.groups.values():
                simulate_group(group)

            move_members_to_new_groups()
            simulate_vaccinations()
            simulate_tests()

            for group in self.groups.values():
                group.counter.save_count()

            update_stats()
            print_stats()

            if self.groups["Infected"].size == 0 or tick >= max_t:
                break

        print(bcolors.ENDC + "\nFinished simulation.")

    def end_iteration(self):
        """
        Ends the simulation and saves the population data as
        population.json, the progression data as progression.csv
        and the used settings as settings.cfg in out/POPULATION_NAME/xxxx/.

        If 'override_newest' is set to True, then the latest saved simulation
        is overridden.
        """

        def set_out_path():
            path = ".." + sep + "out" + sep + self.population.name + sep
            if not os.path.exists(path):
                os.mkdir(path)

            newest_iteration = Standalones.get_last_folder(path)
            override = self.settings["override_newest"] \
                if "override_newest" in self.settings.keys() \
                else newest_iteration == "9999"

            if not newest_iteration:
                path += "0000" + sep
                os.mkdir(path)
            else:
                if override:
                    path += newest_iteration + sep
                else:
                    path += f"{int(newest_iteration) + 1 :04d}" + sep
                    os.mkdir(path)

            return path

        def save_disease_progression(path):
            header = ",".join([group.name for group in self.groups.values()] + list(self.stats.keys()))
            rows = np.array([group.history for group in self.groups.values()] +
                            [np.array(stat_values) for stat_values in self.stats.values()]).T
            np.savetxt(path + "progression.csv", rows, fmt='%d', delimiter=",", header=header, comments='')

        def save_options(path: str):
            shutil.copyfile("Settings" + sep + self.settings["file"], path + "settings.cfg")

        print("\nSaving simulation data...")

        out_path = set_out_path()
        self.population.save_as_json(out_path)
        save_disease_progression(out_path)
        save_options(out_path)

        if self.settings["override_newest"] and os.path.exists(out_path + "Plots"):
            shutil.rmtree(out_path + "Plots")

        print("Finished saving simulation data.")

    def change_settings(self, settings: dict):
        """
        Change the settings of the current simulation.

        If 'population_file' does not change, loading it is omitted.

        Parameters
        ----------
        settings : dict
            The dictionary containing all relevant settings for
            the simulation.
        """

        def check_settings():
            all_settings = Standalones.make_settings("Template.cfg")

            for property in all_settings.keys():
                if property not in settings.keys():
                    settings[property] = all_settings[property]

            if settings["start_lockdown_at"] < settings["end_lockdown_at"]:
                raise ValueError("end_lockdown_at must be smaller than start_lockdown_at")

            return settings

        self.settings = check_settings()

        if self.settings["population_file"] != settings["population_file"]:
            self.population = Population.load_from_file(self.settings["population_file"])

    def reset(self):
        """
        Resets simulation to its state of initialization.
        """

        self.population = self._population_init.copy()

        for group in self.groups.values():
            group.reset()

        for stat in self.stats.keys():
            self.stats[stat] = [0]

        self.arrange_lockdown = False
        self.lockdown_duration = 0
        self.lockdown_ended = -np.inf


################################################################################################
################################################################################################
################################################################################################


class Scenarios:
    """
    Class for simulating various scenarios
    """

    @staticmethod
    def single_simulation(settings: dict):
        """
        Run a single simulation with given settings.

        Parameters
        ----------
        settings : dict
            The dictionary containing all relevant settings for
            the simulation.

        Returns
        ----------
        Simulation
            The simulation object.
        """

        sim = Simulation(settings)
        sim.start_iteration()
        sim.end_iteration()

        return sim

    @staticmethod
    def c_inner_vs_c_outer(settings: dict, n: int = 5):
        """
        Creates a \\(n \\times n\\)-heatmap containing the peak infection values for
        each simulation in relation to the inner- and outer- reproduction numbers in
        the range from 0 to 5. Plot is saved as out/general/c_inner_vs_c_outer_nxn.png.

        Parameters
        ----------
        settings : dict
            The dictionary containing all relevant settings for
            the simulation.

        n : int
            Number of simulations along each axis.
            Defaulted to 5.
        """

        from matplotlib.colors import LinearSegmentedColormap, LogNorm
        custom = LinearSegmentedColormap.from_list('custom', ['g', 'yellow', 'r'], N=255)

        sim = Simulation(settings)
        max_infection_values = np.zeros(shape=(n, n))
        for x, c_i in enumerate(np.linspace(0, 5, n)):
            for y, c_o in enumerate(np.linspace(0, 5, n)):
                settings["inner_reproduction_number"] = c_i
                settings["outer_reproduction_number"] = c_o

                sim.reset()
                sim.change_settings(settings)
                sim.start_iteration()

                max_infection_values[y, x] = max(sim.groups["Infected"].history)

        plt.figure(figsize=(10, 10))
        plt.imshow(max_infection_values, cmap=custom, norm=LogNorm())
        plt.title("Maximum infection numbers in\nrelation to $c_{inner}$ and $c_{outer}$", pad=10)
        plt.xlabel("$c_{inner}$")
        plt.ylabel("$c_{outer}$")
        plt.xticks(ticks=range(0, n), labels=["%.1f" % i for i in np.linspace(0, 5, n)])
        plt.yticks(ticks=range(0, n), labels=["%.1f" % i for i in np.linspace(0, 5, n)])

        for (j, i), label in np.ndenumerate(max_infection_values):
            plt.text(i, j, int(label), ha='center', va='center')

        Standalones.check_existence(".." + sep + "out" + sep + "general")

        plt.savefig("../out/general/c_inner_vs_c_outer_%dx%d.png" % (n, n))
        plt.show()

        print(max_infection_values)

    @staticmethod
    def mitigation_interval(settings: dict, interval_boundaries: tuple = (1.5, 3), samples: int = 16, avg_over: int = 10):
        """
        Creates a plot containing the (average) peak infection values for simulations
        with varying outer reproduction numbers in the given range. Plot is saved as
        out/general/c_inner_vs_c_outer_nxn.png.

        Parameters
        ----------
        settings : dict
            The dictionary containing all relevant settings for
            the simulation.

        interval_boundaries : tuple
            Range of outer reproduction numbers to simulate.
            Defaulted to (1.5, 3).

        samples: int
            Number of (equidistant) values for outer reproduction number to
            be simulated.
            Defaulted to 16.

        avg_over: int
            Number of runs to average peak values over.
            Defaulted to 10.
        """

        sim = Simulation(settings)
        mitigation_interval = np.zeros(samples)
        interval = np.linspace(interval_boundaries[0], interval_boundaries[1], samples)

        for run in range(1, avg_over + 1):
            print("\n" * 25 + "Run %d" % run)
            for i, c_o in enumerate(interval):
                settings["outer_reproduction_number"] = c_o

                sim.reset()
                sim.change_settings(settings)
                sim.start_iteration()

                mitigation_interval[i] = (run - 1) / run * mitigation_interval[i] \
                                         + 1 / run * max(sim.groups["Infected"].history)

        Standalones.check_existence(".." + sep + "out" + sep + "general")

        plt.plot(interval, mitigation_interval, color='r')
        plt.title("Maximum infection numbers in relation to $c_{outer}$.")
        plt.xlabel("$c_{outer}$")
        plt.ylabel("maximum infections")
        plt.xticks(ticks=interval, labels=["%.2f" % i for i in interval])
        plt.xlim(interval_boundaries)
        plt.grid()
        plt.savefig("../out/general/mitigation%.2f-%.2f_%d.png" % (interval_boundaries[0], interval_boundaries[1], avg_over))
        plt.show()


################################################################################################
################################################################################################
################################################################################################


class PostProcessing:
    """
    Class for post processing generated simulation data
    """

    @staticmethod
    def infection_graph(folder: str):
        """
        Creates the infection-graph for the given simulation data and
        saves it as folder/Plots/Infection_graph.pdf.

        Parameters
        ----------
        folder : Folder with simulation data to process
        """

        def get_plot_elements():
            f = json.load(open(folder + "population.json"))
            member_id_dict = {member["id"]: i for i, member in enumerate(f["members"])}

            p = ProgressBar(0, len(f["members"]))
            for member in f["members"]:
                self = member_id_dict[member["id"]]
                if "infections" not in member.keys():
                    p.update(1)
                    continue

                for infection in member["infections"]:
                    infectant = member_id_dict[infection[0]]
                    first_day = infection[2]
                    last_day = infection[4]

                    plot_elements["Lines"] += [[[self, first_day], [self, last_day]]]

                    if infectant != self:
                        plot_elements["Lines"] += [[[self, first_day], [infectant, first_day - 1]]]
                        plot_elements["Infected"] += [[self, first_day]]

                    else:
                        plot_elements["Initials"] += [[self, 0]]

                p.update(1)

        if folder[-1] != sep:
            folder += sep

        Standalones.check_existence(folder + "Plots")

        plot_elements = {"Initials": [], "Infected": [], "Lines": []}
        get_plot_elements()
        plot_elements["Initials"] = np.array(plot_elements["Initials"])
        plot_elements["Infected"] = np.array(plot_elements["Infected"])
        plot_elements["Lines"] = np.array(plot_elements["Lines"])

        ax = plt.gca()
        ax.add_collection(LineCollection(plot_elements["Lines"], color='r', alpha=0.2, linewidth=0.01))
        ax.plot(*plot_elements["Infected"][:, [0, 1]].T, color='r', marker='x', linestyle='None', markersize=0.01)
        ax.plot(*plot_elements["Initials"][:, [0, 1]].T, color='b', marker='x', linestyle='None', markersize=0.01)
        plt.xlabel('Population')
        plt.ylabel('Day')
        plt.savefig(folder + "Plots" + sep + "Infection_graph.pdf")
        plt.show()

    @staticmethod
    def progression_plots(folder: str):
        """
        Creates various plots showing the disease progression for the given simulation
        data and saves them as folder/Plots/PLOT_NAME.png.

        Parameters
        ----------
        folder : Folder with simulation data to process
        """

        def make_plot(plotname: str, title: str, datasets: iter, colors: iter, labels: iter = []):
            _, ax = plt.subplots()
            days = np.arange(0, len(datasets[0]), 1)

            for i, dataset in enumerate(datasets):
                ax.plot(dataset, color=colors[i])

            ax.fill_between(days, 0, 1, where=data["in_lockdown"], color='red', alpha=0.25,
                            transform=ax.get_xaxis_transform())
            ax.set_xlabel("t")
            ax.set_ylabel("#")
            ax.set_xlim(0, days[-1])
            ax.set_title(title)

            if labels:
                ax.legend(labels)

            plt.savefig(folder + "Plots" + sep + plotname)
            plt.show()

        if folder[-1] != sep:
            folder += sep

        Standalones.check_existence(folder + "Plots")

        population_size = json.load(open(folder + "population.json"))["size"]
        data_stream = csv.DictReader(open(folder + "progression.csv"))
        data = {}
        for row in data_stream:
            for key, value in row.items():
                if key not in data.keys():
                    data[key] = []

                data[key] += [int(value)]

        data = {column: np.array(data[column]) for column in data.keys()}
        make_plot("SIRV.png", "Total",
                  [population_size - data["Infected"] - data["Recovered"] - data["Vaccinated"] - data["Dead"],
                   data["Infected"], data["Recovered"], data["Vaccinated"]],
                  ['green', 'red', 'blue', 'cyan'],
                  ["Susceptible", "Infected", "Recovered", "Vaccinated"])

        make_plot("NewI.png", "New Infections",
                  [data["new_infected"], data["ill_vaccinated"]],
                  ['red', 'purple'],
                  ["Regular", "Vaccinated"])

        make_plot("IR.png", "Infected & Recovered",
                  [data["Infected"], data["Recovered"]],
                  ['red', 'blue'],
                  ["Infected", "Recovered"])

        make_plot("V.png", "Vaccinated",
                  [data["Vaccinated"]],
                  ['cyan'],
                  ["Vaccinated"])

        make_plot("7DI.png", "Seven Day Incidence",
                  [data["seven_day_incidence"]],
                  ['red'])

        make_plot("D.png", "Dead",
                  [data["Dead"]],
                  ['black'])

    @staticmethod
    def compare_inner_and_outer_infection_numbers(folder: str):
        """
        Creates a bar plot comparing the relative numbers of infections that
        took place inside and outside of households and saves it as
        folder/Plots/inner_vs_outer.png.

        Parameters
        ----------
        folder : Folder with simulation data to process
        """

        def get_infection_data():
            f = json.load(open(folder + "population.json"))

            p = ProgressBar(0, len(f["members"]))
            for member in f["members"]:
                if "infections" not in member.keys():
                    p.update(1)
                    continue

                for infection in member["infections"]:
                    if infection[1]:
                        infection_data["inside"] += 1

                    else:
                        infection_data["outside"] += 1

                p.update(1)

        if folder[-1] != sep:
            folder += sep

        Standalones.check_existence(folder + "Plots")

        infection_data = {"inside": 0, "outside": 0}
        get_infection_data()

        p_inside = infection_data["inside"] / sum(infection_data.values())

        plt.bar(*zip(*infection_data.items()))
        for i, v in enumerate([p_inside, 1 - p_inside]):
            plt.text(i, list(infection_data.values())[i] / 2, "%.1f%%" % (100 * v), color='black', ha='center',
                     va='center', fontsize=32)

        plt.ylabel('Total')
        plt.savefig(folder + "Plots" + sep + "inner_vs_outer.png")
        plt.show()

    @staticmethod
    def mean_latency_period(folder: str):
        """
        Creates a bar plot comparing the latency periods and
        evaluating the overall mean latency time and saves it as
        folder/Plots/latency_periods.png.

        Parameters
        ----------
        folder : Folder with simulation data to process
        """

        def get_infection_data():
            f = json.load(open(folder + "population.json"))

            p = ProgressBar(0, len(f["members"]))
            for member in f["members"]:
                if "infections" not in member.keys():
                    p.update(1)
                    continue

                for infection in member["infections"]:
                    if infection[0] != member["id"]:
                        if infection[0] not in infection_data.keys():
                            infection_data[infection[0]] = []

                        infection_data[infection[0]] += [int(infection[2])]

                p.update(1)

        if folder[-1] != sep:
            folder += sep

        Standalones.check_existence(folder + "Plots")

        infection_data = {}
        get_infection_data()

        latency_periods = {}
        mean = 0.0
        for i, infections in enumerate(infection_data.values()):
            infections = sorted(infections)
            for j in range(len(infections) - 1):
                period = infections[j+1] - infections[j]

                if period < 30:
                    if period not in latency_periods.keys():
                        latency_periods[period] = 0

                    latency_periods[period] += 1
                    mean = i / (i + 1) * mean + period / (i + 1)

        plt.bar(*zip(*latency_periods.items()))
        plt.axvline(x=mean, color='r')
        plt.title("mean = %.2f" % mean)
        plt.xlabel("Latency period (days)")
        plt.ylabel("#")
        plt.savefig(folder + "Plots" + sep + "latency_periods.png")
        plt.show()

    @staticmethod
    def vaccine_quotas(folder: str):
        """
        Creates a bar plot showing how many people have received how
        many shots of the vaccine. Plot is saved as folder/Plots/vaccine_quotas.png.

        Parameters
        ----------
        folder : Folder with simulation data to process
        """

        def get_vaccination_data():
            f = json.load(open(folder + "population.json"))

            p = ProgressBar(0, len(f["members"]))
            for member in f["members"]:
                if "vaccinations" not in member.keys():
                    p.update(1)
                    continue

                n = len(member["vaccinations"])
                if n not in vaccination_data.keys():
                    vaccination_data[n] = 0

                vaccination_data[n] += 1
                p.update(1)

        if folder[-1] != sep:
            folder += sep

        Standalones.check_existence(folder + "Plots")

        vaccination_data = {}
        get_vaccination_data()

        cumsum = np.sum(list(vaccination_data.values()))
        vaccination_data = {n: 100 * vs / cumsum for n, vs in vaccination_data.items()}

        plt.bar(*zip(*vaccination_data.items()))
        plt.xlabel("Vaccinations")
        plt.ylabel("%")
        plt.savefig(folder + "Plots" + sep + "vaccine_quotas.png")
        plt.show()

    @staticmethod
    def death_distribution(folder: str):
        """
        Creates a bar plot showing the age distribution of people
        who died saves the resulting plot as folder/Plots/death_distribution.png.

        Parameters
        ----------
        folder : Folder with simulation data to process
        """

        def get_death_data():
            f = json.load(open(folder + "population.json"))

            p = ProgressBar(0, len(f["members"]))
            for member in f["members"]:
                if "Death" not in member.keys():
                    p.update(1)
                    continue

                else:
                    age = int(member["age"])
                    if age not in death_data.keys():
                        death_data[age] = 0

                    death_data[age] += 1

                p.update(1)

        if folder[-1] != sep:
            folder += sep

        Standalones.check_existence(folder + "Plots")

        death_data = {}
        get_death_data()

        plt.bar(*zip(*death_data.items()))
        plt.title("Deaths")
        plt.xlim(left=-1)
        plt.xlabel("age")
        plt.ylabel("#")
        plt.savefig(folder + "Plots" + sep + "death_distribution.png")
        plt.show()

    @staticmethod
    def effective_reproduction_number(folder: str):
        """
        Creates a plot showing the effective reproduction number over time
        and saves it as folder/Plots/latency_periods.png.

        Parameters
        ----------
        folder : Folder with simulation data to process
        """

        def get_infection_data():
            data_stream = csv.DictReader(open(folder + "progression.csv"))
            data = {}
            for row in data_stream:
                for key, value in row.items():
                    if key not in data.keys():
                        data[key] = []

                    data[key] += [int(value)]

            settings = Standalones.make_settings("settings.cfg", path=folder)

            return settings["incubation_time"] + settings["infection_time"], data["Infected"], data["new_infected"], len(data["Infected"])

        if folder[-1] != sep:
            folder += sep

        Standalones.check_existence(folder + "Plots")

        mean_infection_time, infection_numbers, new_infection_numbers, duration = get_infection_data()

        effective_r_values = np.zeros(duration)
        for day in range(duration):
            maxd = min(day + mean_infection_time, duration)
            for timestamp in range(day, maxd):
                effective_r_values[day] += new_infection_numbers[timestamp]

        effective_r_values /= np.array(infection_numbers)
        max_r = max(effective_r_values)

        points = np.array([np.arange(duration), effective_r_values]).T.reshape(-1, 1, 2)
        segments = np.concatenate([points[:-1], points[1:]], axis=1)

        cmap = ListedColormap(['g', 'orange', 'r'])
        norm = BoundaryNorm([0, 0.8, 1.2, max(1.2, max_r)], cmap.N)
        lc = LineCollection(segments, cmap=cmap, norm=norm)
        lc.set_array(effective_r_values)

        plt.gca().add_collection(lc)
        plt.xlim([0, duration-1])
        plt.ylim([0, max_r + 0.25])
        plt.xlabel("Day")
        plt.ylabel("Effective Reproduction number")
        plt.savefig(folder + "Plots" + sep + "effective_reproduction_number.png")
        plt.show()


################################################################################################
################################################################################################
################################################################################################


class bcolors:
    HEADER = '\033[95m'
    OKBLUE = '\033[94m'
    OKCYAN = '\033[96m'
    OKGREEN = '\033[92m'
    WARNING = '\033[93m'
    FAIL = '\033[91m'
    ENDC = '\033[0m'
    BOLD = '\033[1m'
    UNDERLINE = '\033[4m'

Classes

class PostProcessing

Class for post processing generated simulation data

Expand source code
class PostProcessing:
    """
    Class for post processing generated simulation data
    """

    @staticmethod
    def infection_graph(folder: str):
        """
        Creates the infection-graph for the given simulation data and
        saves it as folder/Plots/Infection_graph.pdf.

        Parameters
        ----------
        folder : Folder with simulation data to process
        """

        def get_plot_elements():
            f = json.load(open(folder + "population.json"))
            member_id_dict = {member["id"]: i for i, member in enumerate(f["members"])}

            p = ProgressBar(0, len(f["members"]))
            for member in f["members"]:
                self = member_id_dict[member["id"]]
                if "infections" not in member.keys():
                    p.update(1)
                    continue

                for infection in member["infections"]:
                    infectant = member_id_dict[infection[0]]
                    first_day = infection[2]
                    last_day = infection[4]

                    plot_elements["Lines"] += [[[self, first_day], [self, last_day]]]

                    if infectant != self:
                        plot_elements["Lines"] += [[[self, first_day], [infectant, first_day - 1]]]
                        plot_elements["Infected"] += [[self, first_day]]

                    else:
                        plot_elements["Initials"] += [[self, 0]]

                p.update(1)

        if folder[-1] != sep:
            folder += sep

        Standalones.check_existence(folder + "Plots")

        plot_elements = {"Initials": [], "Infected": [], "Lines": []}
        get_plot_elements()
        plot_elements["Initials"] = np.array(plot_elements["Initials"])
        plot_elements["Infected"] = np.array(plot_elements["Infected"])
        plot_elements["Lines"] = np.array(plot_elements["Lines"])

        ax = plt.gca()
        ax.add_collection(LineCollection(plot_elements["Lines"], color='r', alpha=0.2, linewidth=0.01))
        ax.plot(*plot_elements["Infected"][:, [0, 1]].T, color='r', marker='x', linestyle='None', markersize=0.01)
        ax.plot(*plot_elements["Initials"][:, [0, 1]].T, color='b', marker='x', linestyle='None', markersize=0.01)
        plt.xlabel('Population')
        plt.ylabel('Day')
        plt.savefig(folder + "Plots" + sep + "Infection_graph.pdf")
        plt.show()

    @staticmethod
    def progression_plots(folder: str):
        """
        Creates various plots showing the disease progression for the given simulation
        data and saves them as folder/Plots/PLOT_NAME.png.

        Parameters
        ----------
        folder : Folder with simulation data to process
        """

        def make_plot(plotname: str, title: str, datasets: iter, colors: iter, labels: iter = []):
            _, ax = plt.subplots()
            days = np.arange(0, len(datasets[0]), 1)

            for i, dataset in enumerate(datasets):
                ax.plot(dataset, color=colors[i])

            ax.fill_between(days, 0, 1, where=data["in_lockdown"], color='red', alpha=0.25,
                            transform=ax.get_xaxis_transform())
            ax.set_xlabel("t")
            ax.set_ylabel("#")
            ax.set_xlim(0, days[-1])
            ax.set_title(title)

            if labels:
                ax.legend(labels)

            plt.savefig(folder + "Plots" + sep + plotname)
            plt.show()

        if folder[-1] != sep:
            folder += sep

        Standalones.check_existence(folder + "Plots")

        population_size = json.load(open(folder + "population.json"))["size"]
        data_stream = csv.DictReader(open(folder + "progression.csv"))
        data = {}
        for row in data_stream:
            for key, value in row.items():
                if key not in data.keys():
                    data[key] = []

                data[key] += [int(value)]

        data = {column: np.array(data[column]) for column in data.keys()}
        make_plot("SIRV.png", "Total",
                  [population_size - data["Infected"] - data["Recovered"] - data["Vaccinated"] - data["Dead"],
                   data["Infected"], data["Recovered"], data["Vaccinated"]],
                  ['green', 'red', 'blue', 'cyan'],
                  ["Susceptible", "Infected", "Recovered", "Vaccinated"])

        make_plot("NewI.png", "New Infections",
                  [data["new_infected"], data["ill_vaccinated"]],
                  ['red', 'purple'],
                  ["Regular", "Vaccinated"])

        make_plot("IR.png", "Infected & Recovered",
                  [data["Infected"], data["Recovered"]],
                  ['red', 'blue'],
                  ["Infected", "Recovered"])

        make_plot("V.png", "Vaccinated",
                  [data["Vaccinated"]],
                  ['cyan'],
                  ["Vaccinated"])

        make_plot("7DI.png", "Seven Day Incidence",
                  [data["seven_day_incidence"]],
                  ['red'])

        make_plot("D.png", "Dead",
                  [data["Dead"]],
                  ['black'])

    @staticmethod
    def compare_inner_and_outer_infection_numbers(folder: str):
        """
        Creates a bar plot comparing the relative numbers of infections that
        took place inside and outside of households and saves it as
        folder/Plots/inner_vs_outer.png.

        Parameters
        ----------
        folder : Folder with simulation data to process
        """

        def get_infection_data():
            f = json.load(open(folder + "population.json"))

            p = ProgressBar(0, len(f["members"]))
            for member in f["members"]:
                if "infections" not in member.keys():
                    p.update(1)
                    continue

                for infection in member["infections"]:
                    if infection[1]:
                        infection_data["inside"] += 1

                    else:
                        infection_data["outside"] += 1

                p.update(1)

        if folder[-1] != sep:
            folder += sep

        Standalones.check_existence(folder + "Plots")

        infection_data = {"inside": 0, "outside": 0}
        get_infection_data()

        p_inside = infection_data["inside"] / sum(infection_data.values())

        plt.bar(*zip(*infection_data.items()))
        for i, v in enumerate([p_inside, 1 - p_inside]):
            plt.text(i, list(infection_data.values())[i] / 2, "%.1f%%" % (100 * v), color='black', ha='center',
                     va='center', fontsize=32)

        plt.ylabel('Total')
        plt.savefig(folder + "Plots" + sep + "inner_vs_outer.png")
        plt.show()

    @staticmethod
    def mean_latency_period(folder: str):
        """
        Creates a bar plot comparing the latency periods and
        evaluating the overall mean latency time and saves it as
        folder/Plots/latency_periods.png.

        Parameters
        ----------
        folder : Folder with simulation data to process
        """

        def get_infection_data():
            f = json.load(open(folder + "population.json"))

            p = ProgressBar(0, len(f["members"]))
            for member in f["members"]:
                if "infections" not in member.keys():
                    p.update(1)
                    continue

                for infection in member["infections"]:
                    if infection[0] != member["id"]:
                        if infection[0] not in infection_data.keys():
                            infection_data[infection[0]] = []

                        infection_data[infection[0]] += [int(infection[2])]

                p.update(1)

        if folder[-1] != sep:
            folder += sep

        Standalones.check_existence(folder + "Plots")

        infection_data = {}
        get_infection_data()

        latency_periods = {}
        mean = 0.0
        for i, infections in enumerate(infection_data.values()):
            infections = sorted(infections)
            for j in range(len(infections) - 1):
                period = infections[j+1] - infections[j]

                if period < 30:
                    if period not in latency_periods.keys():
                        latency_periods[period] = 0

                    latency_periods[period] += 1
                    mean = i / (i + 1) * mean + period / (i + 1)

        plt.bar(*zip(*latency_periods.items()))
        plt.axvline(x=mean, color='r')
        plt.title("mean = %.2f" % mean)
        plt.xlabel("Latency period (days)")
        plt.ylabel("#")
        plt.savefig(folder + "Plots" + sep + "latency_periods.png")
        plt.show()

    @staticmethod
    def vaccine_quotas(folder: str):
        """
        Creates a bar plot showing how many people have received how
        many shots of the vaccine. Plot is saved as folder/Plots/vaccine_quotas.png.

        Parameters
        ----------
        folder : Folder with simulation data to process
        """

        def get_vaccination_data():
            f = json.load(open(folder + "population.json"))

            p = ProgressBar(0, len(f["members"]))
            for member in f["members"]:
                if "vaccinations" not in member.keys():
                    p.update(1)
                    continue

                n = len(member["vaccinations"])
                if n not in vaccination_data.keys():
                    vaccination_data[n] = 0

                vaccination_data[n] += 1
                p.update(1)

        if folder[-1] != sep:
            folder += sep

        Standalones.check_existence(folder + "Plots")

        vaccination_data = {}
        get_vaccination_data()

        cumsum = np.sum(list(vaccination_data.values()))
        vaccination_data = {n: 100 * vs / cumsum for n, vs in vaccination_data.items()}

        plt.bar(*zip(*vaccination_data.items()))
        plt.xlabel("Vaccinations")
        plt.ylabel("%")
        plt.savefig(folder + "Plots" + sep + "vaccine_quotas.png")
        plt.show()

    @staticmethod
    def death_distribution(folder: str):
        """
        Creates a bar plot showing the age distribution of people
        who died saves the resulting plot as folder/Plots/death_distribution.png.

        Parameters
        ----------
        folder : Folder with simulation data to process
        """

        def get_death_data():
            f = json.load(open(folder + "population.json"))

            p = ProgressBar(0, len(f["members"]))
            for member in f["members"]:
                if "Death" not in member.keys():
                    p.update(1)
                    continue

                else:
                    age = int(member["age"])
                    if age not in death_data.keys():
                        death_data[age] = 0

                    death_data[age] += 1

                p.update(1)

        if folder[-1] != sep:
            folder += sep

        Standalones.check_existence(folder + "Plots")

        death_data = {}
        get_death_data()

        plt.bar(*zip(*death_data.items()))
        plt.title("Deaths")
        plt.xlim(left=-1)
        plt.xlabel("age")
        plt.ylabel("#")
        plt.savefig(folder + "Plots" + sep + "death_distribution.png")
        plt.show()

    @staticmethod
    def effective_reproduction_number(folder: str):
        """
        Creates a plot showing the effective reproduction number over time
        and saves it as folder/Plots/latency_periods.png.

        Parameters
        ----------
        folder : Folder with simulation data to process
        """

        def get_infection_data():
            data_stream = csv.DictReader(open(folder + "progression.csv"))
            data = {}
            for row in data_stream:
                for key, value in row.items():
                    if key not in data.keys():
                        data[key] = []

                    data[key] += [int(value)]

            settings = Standalones.make_settings("settings.cfg", path=folder)

            return settings["incubation_time"] + settings["infection_time"], data["Infected"], data["new_infected"], len(data["Infected"])

        if folder[-1] != sep:
            folder += sep

        Standalones.check_existence(folder + "Plots")

        mean_infection_time, infection_numbers, new_infection_numbers, duration = get_infection_data()

        effective_r_values = np.zeros(duration)
        for day in range(duration):
            maxd = min(day + mean_infection_time, duration)
            for timestamp in range(day, maxd):
                effective_r_values[day] += new_infection_numbers[timestamp]

        effective_r_values /= np.array(infection_numbers)
        max_r = max(effective_r_values)

        points = np.array([np.arange(duration), effective_r_values]).T.reshape(-1, 1, 2)
        segments = np.concatenate([points[:-1], points[1:]], axis=1)

        cmap = ListedColormap(['g', 'orange', 'r'])
        norm = BoundaryNorm([0, 0.8, 1.2, max(1.2, max_r)], cmap.N)
        lc = LineCollection(segments, cmap=cmap, norm=norm)
        lc.set_array(effective_r_values)

        plt.gca().add_collection(lc)
        plt.xlim([0, duration-1])
        plt.ylim([0, max_r + 0.25])
        plt.xlabel("Day")
        plt.ylabel("Effective Reproduction number")
        plt.savefig(folder + "Plots" + sep + "effective_reproduction_number.png")
        plt.show()

Static methods

def compare_inner_and_outer_infection_numbers(folder: str)

Creates a bar plot comparing the relative numbers of infections that took place inside and outside of households and saves it as folder/Plots/inner_vs_outer.png.

Parameters

folder : Folder with simulation data to process
 
Expand source code
@staticmethod
def compare_inner_and_outer_infection_numbers(folder: str):
    """
    Creates a bar plot comparing the relative numbers of infections that
    took place inside and outside of households and saves it as
    folder/Plots/inner_vs_outer.png.

    Parameters
    ----------
    folder : Folder with simulation data to process
    """

    def get_infection_data():
        f = json.load(open(folder + "population.json"))

        p = ProgressBar(0, len(f["members"]))
        for member in f["members"]:
            if "infections" not in member.keys():
                p.update(1)
                continue

            for infection in member["infections"]:
                if infection[1]:
                    infection_data["inside"] += 1

                else:
                    infection_data["outside"] += 1

            p.update(1)

    if folder[-1] != sep:
        folder += sep

    Standalones.check_existence(folder + "Plots")

    infection_data = {"inside": 0, "outside": 0}
    get_infection_data()

    p_inside = infection_data["inside"] / sum(infection_data.values())

    plt.bar(*zip(*infection_data.items()))
    for i, v in enumerate([p_inside, 1 - p_inside]):
        plt.text(i, list(infection_data.values())[i] / 2, "%.1f%%" % (100 * v), color='black', ha='center',
                 va='center', fontsize=32)

    plt.ylabel('Total')
    plt.savefig(folder + "Plots" + sep + "inner_vs_outer.png")
    plt.show()
def death_distribution(folder: str)

Creates a bar plot showing the age distribution of people who died saves the resulting plot as folder/Plots/death_distribution.png.

Parameters

folder : Folder with simulation data to process
 
Expand source code
@staticmethod
def death_distribution(folder: str):
    """
    Creates a bar plot showing the age distribution of people
    who died saves the resulting plot as folder/Plots/death_distribution.png.

    Parameters
    ----------
    folder : Folder with simulation data to process
    """

    def get_death_data():
        f = json.load(open(folder + "population.json"))

        p = ProgressBar(0, len(f["members"]))
        for member in f["members"]:
            if "Death" not in member.keys():
                p.update(1)
                continue

            else:
                age = int(member["age"])
                if age not in death_data.keys():
                    death_data[age] = 0

                death_data[age] += 1

            p.update(1)

    if folder[-1] != sep:
        folder += sep

    Standalones.check_existence(folder + "Plots")

    death_data = {}
    get_death_data()

    plt.bar(*zip(*death_data.items()))
    plt.title("Deaths")
    plt.xlim(left=-1)
    plt.xlabel("age")
    plt.ylabel("#")
    plt.savefig(folder + "Plots" + sep + "death_distribution.png")
    plt.show()
def effective_reproduction_number(folder: str)

Creates a plot showing the effective reproduction number over time and saves it as folder/Plots/latency_periods.png.

Parameters

folder : Folder with simulation data to process
 
Expand source code
@staticmethod
def effective_reproduction_number(folder: str):
    """
    Creates a plot showing the effective reproduction number over time
    and saves it as folder/Plots/latency_periods.png.

    Parameters
    ----------
    folder : Folder with simulation data to process
    """

    def get_infection_data():
        data_stream = csv.DictReader(open(folder + "progression.csv"))
        data = {}
        for row in data_stream:
            for key, value in row.items():
                if key not in data.keys():
                    data[key] = []

                data[key] += [int(value)]

        settings = Standalones.make_settings("settings.cfg", path=folder)

        return settings["incubation_time"] + settings["infection_time"], data["Infected"], data["new_infected"], len(data["Infected"])

    if folder[-1] != sep:
        folder += sep

    Standalones.check_existence(folder + "Plots")

    mean_infection_time, infection_numbers, new_infection_numbers, duration = get_infection_data()

    effective_r_values = np.zeros(duration)
    for day in range(duration):
        maxd = min(day + mean_infection_time, duration)
        for timestamp in range(day, maxd):
            effective_r_values[day] += new_infection_numbers[timestamp]

    effective_r_values /= np.array(infection_numbers)
    max_r = max(effective_r_values)

    points = np.array([np.arange(duration), effective_r_values]).T.reshape(-1, 1, 2)
    segments = np.concatenate([points[:-1], points[1:]], axis=1)

    cmap = ListedColormap(['g', 'orange', 'r'])
    norm = BoundaryNorm([0, 0.8, 1.2, max(1.2, max_r)], cmap.N)
    lc = LineCollection(segments, cmap=cmap, norm=norm)
    lc.set_array(effective_r_values)

    plt.gca().add_collection(lc)
    plt.xlim([0, duration-1])
    plt.ylim([0, max_r + 0.25])
    plt.xlabel("Day")
    plt.ylabel("Effective Reproduction number")
    plt.savefig(folder + "Plots" + sep + "effective_reproduction_number.png")
    plt.show()
def infection_graph(folder: str)

Creates the infection-graph for the given simulation data and saves it as folder/Plots/Infection_graph.pdf.

Parameters

folder : Folder with simulation data to process
 
Expand source code
@staticmethod
def infection_graph(folder: str):
    """
    Creates the infection-graph for the given simulation data and
    saves it as folder/Plots/Infection_graph.pdf.

    Parameters
    ----------
    folder : Folder with simulation data to process
    """

    def get_plot_elements():
        f = json.load(open(folder + "population.json"))
        member_id_dict = {member["id"]: i for i, member in enumerate(f["members"])}

        p = ProgressBar(0, len(f["members"]))
        for member in f["members"]:
            self = member_id_dict[member["id"]]
            if "infections" not in member.keys():
                p.update(1)
                continue

            for infection in member["infections"]:
                infectant = member_id_dict[infection[0]]
                first_day = infection[2]
                last_day = infection[4]

                plot_elements["Lines"] += [[[self, first_day], [self, last_day]]]

                if infectant != self:
                    plot_elements["Lines"] += [[[self, first_day], [infectant, first_day - 1]]]
                    plot_elements["Infected"] += [[self, first_day]]

                else:
                    plot_elements["Initials"] += [[self, 0]]

            p.update(1)

    if folder[-1] != sep:
        folder += sep

    Standalones.check_existence(folder + "Plots")

    plot_elements = {"Initials": [], "Infected": [], "Lines": []}
    get_plot_elements()
    plot_elements["Initials"] = np.array(plot_elements["Initials"])
    plot_elements["Infected"] = np.array(plot_elements["Infected"])
    plot_elements["Lines"] = np.array(plot_elements["Lines"])

    ax = plt.gca()
    ax.add_collection(LineCollection(plot_elements["Lines"], color='r', alpha=0.2, linewidth=0.01))
    ax.plot(*plot_elements["Infected"][:, [0, 1]].T, color='r', marker='x', linestyle='None', markersize=0.01)
    ax.plot(*plot_elements["Initials"][:, [0, 1]].T, color='b', marker='x', linestyle='None', markersize=0.01)
    plt.xlabel('Population')
    plt.ylabel('Day')
    plt.savefig(folder + "Plots" + sep + "Infection_graph.pdf")
    plt.show()
def mean_latency_period(folder: str)

Creates a bar plot comparing the latency periods and evaluating the overall mean latency time and saves it as folder/Plots/latency_periods.png.

Parameters

folder : Folder with simulation data to process
 
Expand source code
@staticmethod
def mean_latency_period(folder: str):
    """
    Creates a bar plot comparing the latency periods and
    evaluating the overall mean latency time and saves it as
    folder/Plots/latency_periods.png.

    Parameters
    ----------
    folder : Folder with simulation data to process
    """

    def get_infection_data():
        f = json.load(open(folder + "population.json"))

        p = ProgressBar(0, len(f["members"]))
        for member in f["members"]:
            if "infections" not in member.keys():
                p.update(1)
                continue

            for infection in member["infections"]:
                if infection[0] != member["id"]:
                    if infection[0] not in infection_data.keys():
                        infection_data[infection[0]] = []

                    infection_data[infection[0]] += [int(infection[2])]

            p.update(1)

    if folder[-1] != sep:
        folder += sep

    Standalones.check_existence(folder + "Plots")

    infection_data = {}
    get_infection_data()

    latency_periods = {}
    mean = 0.0
    for i, infections in enumerate(infection_data.values()):
        infections = sorted(infections)
        for j in range(len(infections) - 1):
            period = infections[j+1] - infections[j]

            if period < 30:
                if period not in latency_periods.keys():
                    latency_periods[period] = 0

                latency_periods[period] += 1
                mean = i / (i + 1) * mean + period / (i + 1)

    plt.bar(*zip(*latency_periods.items()))
    plt.axvline(x=mean, color='r')
    plt.title("mean = %.2f" % mean)
    plt.xlabel("Latency period (days)")
    plt.ylabel("#")
    plt.savefig(folder + "Plots" + sep + "latency_periods.png")
    plt.show()
def progression_plots(folder: str)

Creates various plots showing the disease progression for the given simulation data and saves them as folder/Plots/PLOT_NAME.png.

Parameters

folder : Folder with simulation data to process
 
Expand source code
@staticmethod
def progression_plots(folder: str):
    """
    Creates various plots showing the disease progression for the given simulation
    data and saves them as folder/Plots/PLOT_NAME.png.

    Parameters
    ----------
    folder : Folder with simulation data to process
    """

    def make_plot(plotname: str, title: str, datasets: iter, colors: iter, labels: iter = []):
        _, ax = plt.subplots()
        days = np.arange(0, len(datasets[0]), 1)

        for i, dataset in enumerate(datasets):
            ax.plot(dataset, color=colors[i])

        ax.fill_between(days, 0, 1, where=data["in_lockdown"], color='red', alpha=0.25,
                        transform=ax.get_xaxis_transform())
        ax.set_xlabel("t")
        ax.set_ylabel("#")
        ax.set_xlim(0, days[-1])
        ax.set_title(title)

        if labels:
            ax.legend(labels)

        plt.savefig(folder + "Plots" + sep + plotname)
        plt.show()

    if folder[-1] != sep:
        folder += sep

    Standalones.check_existence(folder + "Plots")

    population_size = json.load(open(folder + "population.json"))["size"]
    data_stream = csv.DictReader(open(folder + "progression.csv"))
    data = {}
    for row in data_stream:
        for key, value in row.items():
            if key not in data.keys():
                data[key] = []

            data[key] += [int(value)]

    data = {column: np.array(data[column]) for column in data.keys()}
    make_plot("SIRV.png", "Total",
              [population_size - data["Infected"] - data["Recovered"] - data["Vaccinated"] - data["Dead"],
               data["Infected"], data["Recovered"], data["Vaccinated"]],
              ['green', 'red', 'blue', 'cyan'],
              ["Susceptible", "Infected", "Recovered", "Vaccinated"])

    make_plot("NewI.png", "New Infections",
              [data["new_infected"], data["ill_vaccinated"]],
              ['red', 'purple'],
              ["Regular", "Vaccinated"])

    make_plot("IR.png", "Infected & Recovered",
              [data["Infected"], data["Recovered"]],
              ['red', 'blue'],
              ["Infected", "Recovered"])

    make_plot("V.png", "Vaccinated",
              [data["Vaccinated"]],
              ['cyan'],
              ["Vaccinated"])

    make_plot("7DI.png", "Seven Day Incidence",
              [data["seven_day_incidence"]],
              ['red'])

    make_plot("D.png", "Dead",
              [data["Dead"]],
              ['black'])
def vaccine_quotas(folder: str)

Creates a bar plot showing how many people have received how many shots of the vaccine. Plot is saved as folder/Plots/vaccine_quotas.png.

Parameters

folder : Folder with simulation data to process
 
Expand source code
@staticmethod
def vaccine_quotas(folder: str):
    """
    Creates a bar plot showing how many people have received how
    many shots of the vaccine. Plot is saved as folder/Plots/vaccine_quotas.png.

    Parameters
    ----------
    folder : Folder with simulation data to process
    """

    def get_vaccination_data():
        f = json.load(open(folder + "population.json"))

        p = ProgressBar(0, len(f["members"]))
        for member in f["members"]:
            if "vaccinations" not in member.keys():
                p.update(1)
                continue

            n = len(member["vaccinations"])
            if n not in vaccination_data.keys():
                vaccination_data[n] = 0

            vaccination_data[n] += 1
            p.update(1)

    if folder[-1] != sep:
        folder += sep

    Standalones.check_existence(folder + "Plots")

    vaccination_data = {}
    get_vaccination_data()

    cumsum = np.sum(list(vaccination_data.values()))
    vaccination_data = {n: 100 * vs / cumsum for n, vs in vaccination_data.items()}

    plt.bar(*zip(*vaccination_data.items()))
    plt.xlabel("Vaccinations")
    plt.ylabel("%")
    plt.savefig(folder + "Plots" + sep + "vaccine_quotas.png")
    plt.show()
class Scenarios

Class for simulating various scenarios

Expand source code
class Scenarios:
    """
    Class for simulating various scenarios
    """

    @staticmethod
    def single_simulation(settings: dict):
        """
        Run a single simulation with given settings.

        Parameters
        ----------
        settings : dict
            The dictionary containing all relevant settings for
            the simulation.

        Returns
        ----------
        Simulation
            The simulation object.
        """

        sim = Simulation(settings)
        sim.start_iteration()
        sim.end_iteration()

        return sim

    @staticmethod
    def c_inner_vs_c_outer(settings: dict, n: int = 5):
        """
        Creates a \\(n \\times n\\)-heatmap containing the peak infection values for
        each simulation in relation to the inner- and outer- reproduction numbers in
        the range from 0 to 5. Plot is saved as out/general/c_inner_vs_c_outer_nxn.png.

        Parameters
        ----------
        settings : dict
            The dictionary containing all relevant settings for
            the simulation.

        n : int
            Number of simulations along each axis.
            Defaulted to 5.
        """

        from matplotlib.colors import LinearSegmentedColormap, LogNorm
        custom = LinearSegmentedColormap.from_list('custom', ['g', 'yellow', 'r'], N=255)

        sim = Simulation(settings)
        max_infection_values = np.zeros(shape=(n, n))
        for x, c_i in enumerate(np.linspace(0, 5, n)):
            for y, c_o in enumerate(np.linspace(0, 5, n)):
                settings["inner_reproduction_number"] = c_i
                settings["outer_reproduction_number"] = c_o

                sim.reset()
                sim.change_settings(settings)
                sim.start_iteration()

                max_infection_values[y, x] = max(sim.groups["Infected"].history)

        plt.figure(figsize=(10, 10))
        plt.imshow(max_infection_values, cmap=custom, norm=LogNorm())
        plt.title("Maximum infection numbers in\nrelation to $c_{inner}$ and $c_{outer}$", pad=10)
        plt.xlabel("$c_{inner}$")
        plt.ylabel("$c_{outer}$")
        plt.xticks(ticks=range(0, n), labels=["%.1f" % i for i in np.linspace(0, 5, n)])
        plt.yticks(ticks=range(0, n), labels=["%.1f" % i for i in np.linspace(0, 5, n)])

        for (j, i), label in np.ndenumerate(max_infection_values):
            plt.text(i, j, int(label), ha='center', va='center')

        Standalones.check_existence(".." + sep + "out" + sep + "general")

        plt.savefig("../out/general/c_inner_vs_c_outer_%dx%d.png" % (n, n))
        plt.show()

        print(max_infection_values)

    @staticmethod
    def mitigation_interval(settings: dict, interval_boundaries: tuple = (1.5, 3), samples: int = 16, avg_over: int = 10):
        """
        Creates a plot containing the (average) peak infection values for simulations
        with varying outer reproduction numbers in the given range. Plot is saved as
        out/general/c_inner_vs_c_outer_nxn.png.

        Parameters
        ----------
        settings : dict
            The dictionary containing all relevant settings for
            the simulation.

        interval_boundaries : tuple
            Range of outer reproduction numbers to simulate.
            Defaulted to (1.5, 3).

        samples: int
            Number of (equidistant) values for outer reproduction number to
            be simulated.
            Defaulted to 16.

        avg_over: int
            Number of runs to average peak values over.
            Defaulted to 10.
        """

        sim = Simulation(settings)
        mitigation_interval = np.zeros(samples)
        interval = np.linspace(interval_boundaries[0], interval_boundaries[1], samples)

        for run in range(1, avg_over + 1):
            print("\n" * 25 + "Run %d" % run)
            for i, c_o in enumerate(interval):
                settings["outer_reproduction_number"] = c_o

                sim.reset()
                sim.change_settings(settings)
                sim.start_iteration()

                mitigation_interval[i] = (run - 1) / run * mitigation_interval[i] \
                                         + 1 / run * max(sim.groups["Infected"].history)

        Standalones.check_existence(".." + sep + "out" + sep + "general")

        plt.plot(interval, mitigation_interval, color='r')
        plt.title("Maximum infection numbers in relation to $c_{outer}$.")
        plt.xlabel("$c_{outer}$")
        plt.ylabel("maximum infections")
        plt.xticks(ticks=interval, labels=["%.2f" % i for i in interval])
        plt.xlim(interval_boundaries)
        plt.grid()
        plt.savefig("../out/general/mitigation%.2f-%.2f_%d.png" % (interval_boundaries[0], interval_boundaries[1], avg_over))
        plt.show()

Static methods

def c_inner_vs_c_outer(settings: dict, n: int = 5)

Creates a n \times n-heatmap containing the peak infection values for each simulation in relation to the inner- and outer- reproduction numbers in the range from 0 to 5. Plot is saved as out/general/c_inner_vs_c_outer_nxn.png.

Parameters

settings : dict
The dictionary containing all relevant settings for the simulation.
n : int
Number of simulations along each axis. Defaulted to 5.
Expand source code
@staticmethod
def c_inner_vs_c_outer(settings: dict, n: int = 5):
    """
    Creates a \\(n \\times n\\)-heatmap containing the peak infection values for
    each simulation in relation to the inner- and outer- reproduction numbers in
    the range from 0 to 5. Plot is saved as out/general/c_inner_vs_c_outer_nxn.png.

    Parameters
    ----------
    settings : dict
        The dictionary containing all relevant settings for
        the simulation.

    n : int
        Number of simulations along each axis.
        Defaulted to 5.
    """

    from matplotlib.colors import LinearSegmentedColormap, LogNorm
    custom = LinearSegmentedColormap.from_list('custom', ['g', 'yellow', 'r'], N=255)

    sim = Simulation(settings)
    max_infection_values = np.zeros(shape=(n, n))
    for x, c_i in enumerate(np.linspace(0, 5, n)):
        for y, c_o in enumerate(np.linspace(0, 5, n)):
            settings["inner_reproduction_number"] = c_i
            settings["outer_reproduction_number"] = c_o

            sim.reset()
            sim.change_settings(settings)
            sim.start_iteration()

            max_infection_values[y, x] = max(sim.groups["Infected"].history)

    plt.figure(figsize=(10, 10))
    plt.imshow(max_infection_values, cmap=custom, norm=LogNorm())
    plt.title("Maximum infection numbers in\nrelation to $c_{inner}$ and $c_{outer}$", pad=10)
    plt.xlabel("$c_{inner}$")
    plt.ylabel("$c_{outer}$")
    plt.xticks(ticks=range(0, n), labels=["%.1f" % i for i in np.linspace(0, 5, n)])
    plt.yticks(ticks=range(0, n), labels=["%.1f" % i for i in np.linspace(0, 5, n)])

    for (j, i), label in np.ndenumerate(max_infection_values):
        plt.text(i, j, int(label), ha='center', va='center')

    Standalones.check_existence(".." + sep + "out" + sep + "general")

    plt.savefig("../out/general/c_inner_vs_c_outer_%dx%d.png" % (n, n))
    plt.show()

    print(max_infection_values)
def mitigation_interval(settings: dict, interval_boundaries: tuple = (1.5, 3), samples: int = 16, avg_over: int = 10)

Creates a plot containing the (average) peak infection values for simulations with varying outer reproduction numbers in the given range. Plot is saved as out/general/c_inner_vs_c_outer_nxn.png.

Parameters

settings : dict
The dictionary containing all relevant settings for the simulation.
interval_boundaries : tuple
Range of outer reproduction numbers to simulate. Defaulted to (1.5, 3).
samples : int
Number of (equidistant) values for outer reproduction number to be simulated. Defaulted to 16.
avg_over : int
Number of runs to average peak values over. Defaulted to 10.
Expand source code
@staticmethod
def mitigation_interval(settings: dict, interval_boundaries: tuple = (1.5, 3), samples: int = 16, avg_over: int = 10):
    """
    Creates a plot containing the (average) peak infection values for simulations
    with varying outer reproduction numbers in the given range. Plot is saved as
    out/general/c_inner_vs_c_outer_nxn.png.

    Parameters
    ----------
    settings : dict
        The dictionary containing all relevant settings for
        the simulation.

    interval_boundaries : tuple
        Range of outer reproduction numbers to simulate.
        Defaulted to (1.5, 3).

    samples: int
        Number of (equidistant) values for outer reproduction number to
        be simulated.
        Defaulted to 16.

    avg_over: int
        Number of runs to average peak values over.
        Defaulted to 10.
    """

    sim = Simulation(settings)
    mitigation_interval = np.zeros(samples)
    interval = np.linspace(interval_boundaries[0], interval_boundaries[1], samples)

    for run in range(1, avg_over + 1):
        print("\n" * 25 + "Run %d" % run)
        for i, c_o in enumerate(interval):
            settings["outer_reproduction_number"] = c_o

            sim.reset()
            sim.change_settings(settings)
            sim.start_iteration()

            mitigation_interval[i] = (run - 1) / run * mitigation_interval[i] \
                                     + 1 / run * max(sim.groups["Infected"].history)

    Standalones.check_existence(".." + sep + "out" + sep + "general")

    plt.plot(interval, mitigation_interval, color='r')
    plt.title("Maximum infection numbers in relation to $c_{outer}$.")
    plt.xlabel("$c_{outer}$")
    plt.ylabel("maximum infections")
    plt.xticks(ticks=interval, labels=["%.2f" % i for i in interval])
    plt.xlim(interval_boundaries)
    plt.grid()
    plt.savefig("../out/general/mitigation%.2f-%.2f_%d.png" % (interval_boundaries[0], interval_boundaries[1], avg_over))
    plt.show()
def single_simulation(settings: dict)

Run a single simulation with given settings.

Parameters

settings : dict
The dictionary containing all relevant settings for the simulation.

Returns

Simulation
The simulation object.
Expand source code
@staticmethod
def single_simulation(settings: dict):
    """
    Run a single simulation with given settings.

    Parameters
    ----------
    settings : dict
        The dictionary containing all relevant settings for
        the simulation.

    Returns
    ----------
    Simulation
        The simulation object.
    """

    sim = Simulation(settings)
    sim.start_iteration()
    sim.end_iteration()

    return sim
class Simulation (settings: dict)

Create a Simulation object with the given settings.

Parameters

settings : dict
The dictionary containing all relevant settings for the simulation.
Expand source code
class Simulation:
    def __init__(self, settings: dict):
        """
        Create a Simulation object with the given settings.

        Parameters
        ----------
        settings : dict
            The dictionary containing all relevant settings for
            the simulation.
        """

        self.settings = None
        self.change_settings(settings)

        self.population = Population.load_from_file(self.settings["population_file"])
        self._population_init = self.population.copy()
        self.groups = {"Infected": Group("Infected"),
                       "Recovered": Group("Recovered"),
                       "Vaccinated": Group("Vaccinated"),
                       "Dead": Group("Dead"),
                       "Quarantined": Group("Quarantined")}
        self.stats = {"new_infected": [0],
                      "new_recovered": [0],
                      "new_susceptible": [0],
                      "new_vaccinated": [0],
                      "new_dead": [0],
                      "ill_vaccinated": [0],
                      "test_results_-": [0],
                      "test_results_+": [0],
                      "seven_day_incidence": [0],
                      "in_lockdown": [0]
                      }
        self.arrange_lockdown = False
        self.lockdown_duration = 0
        self.lockdown_ended = -np.inf

    def start_iteration(self):
        """
        Starts the simulation and runs it until infection numbers
        drop to zero or time given in 'maximum_simulation_time_interval'
        is reached.
        """

        def initialize_groups():
            def put_inits_in_respective_group():
                gen_params = lambda: {
                    "incubation_period": -1,
                    "infection_period": np.random.poisson(c_infection),
                    "immunity_period": np.random.poisson(c_immunity)
                }

                for ini_inf in ini_infs:
                    ini_inf.infect(ini_inf, 0, gen_params())
                    self.groups["Infected"].add_member(ini_inf)

                for ini_rec in ini_recs:
                    ini_rec.make_immune(np.random.poisson(c_immunity))
                    self.groups["Recovered"].add_member(ini_rec)
                    ini_rec.recovered = True

                for ini_vac in ini_vacs:
                    ini_vac.make_immune(np.random.poisson(c_vac_immunity))
                    self.groups["Vaccinated"].add_member(ini_vac)
                    ini_vac.vaccinated = True

                for group in self.groups.values():
                    group.counter.save_count()
                    group.counter.squash_history()

            infs_recs_vacs = np.random.choice(self.population.members,
                                              size=n_ini_inf + n_ini_recs + n_ini_vacs,
                                              replace=False)

            ini_infs = infs_recs_vacs[:n_ini_inf]
            ini_recs = infs_recs_vacs[n_ini_inf:n_ini_inf + n_ini_recs]
            ini_vacs = infs_recs_vacs[n_ini_inf + n_ini_recs:]

            put_inits_in_respective_group()

        def simulate_group(group: Group):
            if group.name == "Infected":
                for member in group:
                    if member.make_tick("infectious"):
                        n_inner, n_outer = np.random.poisson(c_inner), np.random.poisson(c_outer)

                        if self.arrange_lockdown:
                            n_inner, n_outer = np.random.poisson(3/2 * c_inner), np.random.poisson(1/2 * c_outer)

                        if member.quarantined:
                            n_inner, n_outer = np.random.poisson(2 * c_inner), 0

                        gen_params = lambda: {
                            "infection_probability_heuristic": infection_heuristic,
                            "vaccine_failure_probability_heuristic": vaccine_heuristic,
                            "incubation_period": np.random.poisson(c_incubation),
                            "infection_period": np.random.poisson(c_infection),
                            "immunity_period": np.random.poisson(c_immunity),
                        }

                        household = self.population.households[member.properties["household"]]
                        new_members["newly_infected"] += household.spread_disease(member, n_inner, tick, gen_params())
                        new_members["newly_infected"] += self.population.spread_disease(member, n_outer, tick,
                                                                                        gen_params())

                        if member.make_tick("recover"):
                            new_members["newly_recovered"] += [member]
                            member.recovered = True

                        elif np.random.uniform() < mortality_heuristic(member.properties):
                            new_members["new_dead"] += [member]
                            member.make_dead(tick)

                    if member.vaccinated:
                        member.make_tick("vaccine")

                new_members["newly_infected_vac"] = [m for m in new_members["newly_infected"] if m.vaccinated]

            elif group.name == "Recovered":
                for member in group:
                    if member.make_tick("immunity"):
                        new_members["newly_susceptible_rec"] += [member]
                        new_members["newly_susceptible"] += [member]

            elif group.name == "Vaccinated":
                for member in group:
                    if member.make_tick("immunity"):
                        new_members["newly_susceptible_vac"] += [member]
                        new_members["newly_susceptible"] += [member]

                    elif not member.infected:
                        member.make_tick("vaccine")

            elif group.name == "Dead":
                pass

            elif group.name == "Quarantined":
                for member in group:
                    if member.make_tick("quarantine", tick):
                        group.remove_member(member)

            else:
                raise ValueError("Group '" + group.name + "' does not have an update function")

        def move_members_to_new_groups():
            for member in new_members["newly_susceptible_rec"]:
                self.groups["Recovered"].remove_member(member)
                member.recovered = False

            for member in new_members["newly_susceptible_vac"]:
                self.groups["Vaccinated"].remove_member(member)
                member.vaccinated = False

            Group.move(new_members["newly_recovered"], self.groups["Infected"], self.groups["Recovered"])

            Group.move(new_members["new_dead"], self.groups["Infected"], self.groups["Dead"])

            for member in new_members["new_dead"]:
                self.groups["Quarantined"].remove_member(member)

            for member in new_members["newly_infected"]:
                self.groups["Infected"].add_member(member)
                if member.vaccinated:
                    self.groups["Vaccinated"].remove_member(member)
                    member.vaccinated = False

        def simulate_vaccinations():
            for member in new_members["staged_vaccinated"]:
                gen_params = lambda: {
                    "t_vac_effect": np.random.poisson(c_vac_effect),
                    "t_immunity": np.random.poisson(c_vac_immunity),
                    "t_wait_vac": t_wait_vac,
                    "t_wait_rec": t_wait_rec
                }

                if member.vaccinate(tick, gen_params()):
                    if member.recovered:
                        Group.move([member], self.groups["Recovered"], self.groups["Vaccinated"])
                        member.recovered = False

                    elif not member.vaccinated:
                        self.groups["Vaccinated"].add_member(member)

                    member.vaccinated = True

                else:
                    new_members["not_vaccinated"] += [member]

        def simulate_tests():
            def test_and_quarantine_procedure(member):
                result = False
                if not member.quarantined and tick != member._last_tested:
                    if (not self.settings["test_vaccinated"]) and "vaccinations" in member.properties.keys() and \
                            tick < member.properties["vaccinations"][-1][0] + self.settings["vaccination_immunity_time"]:
                        return False

                    result = member.test(tick)
                    results[result] += 1
                    if result:
                        member.quarantine(self.settings["quarantine_duration"])
                        self.groups["Quarantined"].add_member(member)

                return result

            def backtrack(member, depth):
                if depth <= 0 or np.random.uniform() > self.settings["backtracking_probability"]:
                    return

                for contact in member.recent_contacts:
                    if test_and_quarantine_procedure(contact):
                        backtrack(contact, depth - 1)

            n_tests = min(np.random.poisson(c_tests), self.population.size)
            results = [0, 0]
            for member in np.random.choice(self.population.members, size=n_tests, replace=False):
                if test_and_quarantine_procedure(member):
                    backtrack(member, self.settings["backtracking_depth"])

            self.stats["test_results_-"] += [results[0]]
            self.stats["test_results_+"] += [results[1]]

        def decide_measure(measure: str):
            if measure == "lockdown":
                # Would a lockdown be arranged/prolonged?
                if self.settings["start_lockdown_at"] <= self.stats["seven_day_incidence"][-1]:
                    result = True

                elif self.settings["end_lockdown_at"] >= self.stats["seven_day_incidence"][-1]:
                    result = False

                else:
                    result = self.arrange_lockdown

                # If it is (not) prolonged, is it still in the allowed duration interval?
                if result:
                    if self.lockdown_duration >= self.settings["maximum_lockdown_duration"]:
                        self.lockdown_duration = 0
                        self.lockdown_ended = tick
                        result = False

                else:
                    if self.arrange_lockdown and self.lockdown_duration < self.settings["minimum_lockdown_duration"]:
                        result = True

                    else:
                        if self.arrange_lockdown:
                            self.lockdown_ended = tick

                        self.lockdown_duration = 0

                # If a new lockdown is arranged, is it sufficiently spaced from the last?
                if result and not self.arrange_lockdown:
                    result = tick - self.lockdown_ended > self.settings["lockdown_gap"]

                # Are enough people vaccinated?
                if self.groups["Vaccinated"].size >= self.settings["heard_immunity"] * self.population.size:
                    result = self.arrange_lockdown

                self.lockdown_duration += result

                return result

            else:
                raise ValueError("Measure not available")

        def update_stats():
            def calc_7di():
                positive = self.stats["test_results_+"]
                if len(positive) >= 7:
                    return round(sum(positive[-7:]) * 100000 / self.population.size)
                else:
                    return round(sum(positive) * 7 / len(positive) * 100000 / self.population.size)

            self.stats["new_infected"] += [len(new_members["newly_infected"])]
            self.stats["new_recovered"] += [len(new_members["newly_recovered"])]
            self.stats["new_susceptible"] += [len(new_members["newly_susceptible"])]
            self.stats["new_vaccinated"] += [n_vacs - len(new_members["not_vaccinated"])]
            self.stats["new_dead"] += [len(new_members["new_dead"])]
            self.stats["ill_vaccinated"] += [len(new_members["newly_infected_vac"])]
            self.stats["seven_day_incidence"] += [calc_7di()]
            self.stats["in_lockdown"] += [1 if self.arrange_lockdown else 0]

        def print_stats():
            color = bcolors.FAIL if self.arrange_lockdown else bcolors.OKGREEN
            print(color + "\rDay: %04d, #Infected: %d, #Vaccinated: %d, #Quarantined: %d, #Dead: %d, 7di: %d\033[K"
                  % (tick,
                     self.groups["Infected"].size,
                     self.groups["Vaccinated"].size,
                     self.groups["Quarantined"].size,
                     self.groups["Dead"].size,
                     self.stats["seven_day_incidence"][-1]), end="")

        print("\nInitializing simulation...")

        # c -> put into poisson, n -> fixed value
        tick = 0
        infection_heuristic = self.settings["infection_probability_heuristic"]
        mortality_heuristic = self.settings["mortality_probability_heuristic"]
        vaccine_heuristic = self.settings["vaccine_failure_probability_heuristic"]
        c_inner = self.settings["inner_reproduction_number"]
        c_outer = self.settings["outer_reproduction_number"]
        n_ini_inf = self.settings["number_of_initially_infected"]
        n_ini_recs = self.settings["number_of_initially_recovered"]
        n_ini_vacs = self.settings["number_of_initially_vaccinated"]
        c_incubation = self.settings["incubation_time"]
        c_infection = self.settings["infection_time"]
        c_immunity = self.settings["recovered_immunity_time"]
        c_vac_effect = self.settings["vaccination_takes_effect_time"]
        c_vac_immunity = self.settings["vaccination_immunity_time"]
        c_vacs = self.settings["vaccinations_per_day"]
        c_tests = self.settings["tests_per_day"]
        t_wait_vac = self.settings["waiting_time_vaccinated_until_new_vaccination"]
        t_wait_rec = self.settings["waiting_time_recovered_until_vaccination"]
        t_vac_available = self.settings["vaccine_available_as_of"]
        max_t = self.settings["maximum_simulation_time_interval"]

        initialize_groups()

        print("Finished initializing simulation.\n\nStarting simulation...")

        print_stats()

        while True:
            tick += 1
            n_vacs = min(np.random.poisson(c_vacs), self.population.size)
            n_vacs = round(n_vacs * np.exp(-100 / (tick - t_vac_available)) if tick > t_vac_available else 0)

            self.arrange_lockdown = decide_measure("lockdown")

            new_members = {
                "newly_susceptible": [],
                "newly_infected": [],
                "newly_infected_vac": [],
                "newly_recovered": [],
                "newly_susceptible_rec": [],
                "newly_susceptible_vac": [],
                "staged_vaccinated": np.random.choice(self.population.members, size=n_vacs, replace=False),
                "not_vaccinated": [],
                "new_dead": []
            }

            for group in self.groups.values():
                simulate_group(group)

            move_members_to_new_groups()
            simulate_vaccinations()
            simulate_tests()

            for group in self.groups.values():
                group.counter.save_count()

            update_stats()
            print_stats()

            if self.groups["Infected"].size == 0 or tick >= max_t:
                break

        print(bcolors.ENDC + "\nFinished simulation.")

    def end_iteration(self):
        """
        Ends the simulation and saves the population data as
        population.json, the progression data as progression.csv
        and the used settings as settings.cfg in out/POPULATION_NAME/xxxx/.

        If 'override_newest' is set to True, then the latest saved simulation
        is overridden.
        """

        def set_out_path():
            path = ".." + sep + "out" + sep + self.population.name + sep
            if not os.path.exists(path):
                os.mkdir(path)

            newest_iteration = Standalones.get_last_folder(path)
            override = self.settings["override_newest"] \
                if "override_newest" in self.settings.keys() \
                else newest_iteration == "9999"

            if not newest_iteration:
                path += "0000" + sep
                os.mkdir(path)
            else:
                if override:
                    path += newest_iteration + sep
                else:
                    path += f"{int(newest_iteration) + 1 :04d}" + sep
                    os.mkdir(path)

            return path

        def save_disease_progression(path):
            header = ",".join([group.name for group in self.groups.values()] + list(self.stats.keys()))
            rows = np.array([group.history for group in self.groups.values()] +
                            [np.array(stat_values) for stat_values in self.stats.values()]).T
            np.savetxt(path + "progression.csv", rows, fmt='%d', delimiter=",", header=header, comments='')

        def save_options(path: str):
            shutil.copyfile("Settings" + sep + self.settings["file"], path + "settings.cfg")

        print("\nSaving simulation data...")

        out_path = set_out_path()
        self.population.save_as_json(out_path)
        save_disease_progression(out_path)
        save_options(out_path)

        if self.settings["override_newest"] and os.path.exists(out_path + "Plots"):
            shutil.rmtree(out_path + "Plots")

        print("Finished saving simulation data.")

    def change_settings(self, settings: dict):
        """
        Change the settings of the current simulation.

        If 'population_file' does not change, loading it is omitted.

        Parameters
        ----------
        settings : dict
            The dictionary containing all relevant settings for
            the simulation.
        """

        def check_settings():
            all_settings = Standalones.make_settings("Template.cfg")

            for property in all_settings.keys():
                if property not in settings.keys():
                    settings[property] = all_settings[property]

            if settings["start_lockdown_at"] < settings["end_lockdown_at"]:
                raise ValueError("end_lockdown_at must be smaller than start_lockdown_at")

            return settings

        self.settings = check_settings()

        if self.settings["population_file"] != settings["population_file"]:
            self.population = Population.load_from_file(self.settings["population_file"])

    def reset(self):
        """
        Resets simulation to its state of initialization.
        """

        self.population = self._population_init.copy()

        for group in self.groups.values():
            group.reset()

        for stat in self.stats.keys():
            self.stats[stat] = [0]

        self.arrange_lockdown = False
        self.lockdown_duration = 0
        self.lockdown_ended = -np.inf

Methods

def change_settings(self, settings: dict)

Change the settings of the current simulation.

If 'population_file' does not change, loading it is omitted.

Parameters

settings : dict
The dictionary containing all relevant settings for the simulation.
Expand source code
def change_settings(self, settings: dict):
    """
    Change the settings of the current simulation.

    If 'population_file' does not change, loading it is omitted.

    Parameters
    ----------
    settings : dict
        The dictionary containing all relevant settings for
        the simulation.
    """

    def check_settings():
        all_settings = Standalones.make_settings("Template.cfg")

        for property in all_settings.keys():
            if property not in settings.keys():
                settings[property] = all_settings[property]

        if settings["start_lockdown_at"] < settings["end_lockdown_at"]:
            raise ValueError("end_lockdown_at must be smaller than start_lockdown_at")

        return settings

    self.settings = check_settings()

    if self.settings["population_file"] != settings["population_file"]:
        self.population = Population.load_from_file(self.settings["population_file"])
def end_iteration(self)

Ends the simulation and saves the population data as population.json, the progression data as progression.csv and the used settings as settings.cfg in out/POPULATION_NAME/xxxx/.

If 'override_newest' is set to True, then the latest saved simulation is overridden.

Expand source code
def end_iteration(self):
    """
    Ends the simulation and saves the population data as
    population.json, the progression data as progression.csv
    and the used settings as settings.cfg in out/POPULATION_NAME/xxxx/.

    If 'override_newest' is set to True, then the latest saved simulation
    is overridden.
    """

    def set_out_path():
        path = ".." + sep + "out" + sep + self.population.name + sep
        if not os.path.exists(path):
            os.mkdir(path)

        newest_iteration = Standalones.get_last_folder(path)
        override = self.settings["override_newest"] \
            if "override_newest" in self.settings.keys() \
            else newest_iteration == "9999"

        if not newest_iteration:
            path += "0000" + sep
            os.mkdir(path)
        else:
            if override:
                path += newest_iteration + sep
            else:
                path += f"{int(newest_iteration) + 1 :04d}" + sep
                os.mkdir(path)

        return path

    def save_disease_progression(path):
        header = ",".join([group.name for group in self.groups.values()] + list(self.stats.keys()))
        rows = np.array([group.history for group in self.groups.values()] +
                        [np.array(stat_values) for stat_values in self.stats.values()]).T
        np.savetxt(path + "progression.csv", rows, fmt='%d', delimiter=",", header=header, comments='')

    def save_options(path: str):
        shutil.copyfile("Settings" + sep + self.settings["file"], path + "settings.cfg")

    print("\nSaving simulation data...")

    out_path = set_out_path()
    self.population.save_as_json(out_path)
    save_disease_progression(out_path)
    save_options(out_path)

    if self.settings["override_newest"] and os.path.exists(out_path + "Plots"):
        shutil.rmtree(out_path + "Plots")

    print("Finished saving simulation data.")
def reset(self)

Resets simulation to its state of initialization.

Expand source code
def reset(self):
    """
    Resets simulation to its state of initialization.
    """

    self.population = self._population_init.copy()

    for group in self.groups.values():
        group.reset()

    for stat in self.stats.keys():
        self.stats[stat] = [0]

    self.arrange_lockdown = False
    self.lockdown_duration = 0
    self.lockdown_ended = -np.inf
def start_iteration(self)

Starts the simulation and runs it until infection numbers drop to zero or time given in 'maximum_simulation_time_interval' is reached.

Expand source code
def start_iteration(self):
    """
    Starts the simulation and runs it until infection numbers
    drop to zero or time given in 'maximum_simulation_time_interval'
    is reached.
    """

    def initialize_groups():
        def put_inits_in_respective_group():
            gen_params = lambda: {
                "incubation_period": -1,
                "infection_period": np.random.poisson(c_infection),
                "immunity_period": np.random.poisson(c_immunity)
            }

            for ini_inf in ini_infs:
                ini_inf.infect(ini_inf, 0, gen_params())
                self.groups["Infected"].add_member(ini_inf)

            for ini_rec in ini_recs:
                ini_rec.make_immune(np.random.poisson(c_immunity))
                self.groups["Recovered"].add_member(ini_rec)
                ini_rec.recovered = True

            for ini_vac in ini_vacs:
                ini_vac.make_immune(np.random.poisson(c_vac_immunity))
                self.groups["Vaccinated"].add_member(ini_vac)
                ini_vac.vaccinated = True

            for group in self.groups.values():
                group.counter.save_count()
                group.counter.squash_history()

        infs_recs_vacs = np.random.choice(self.population.members,
                                          size=n_ini_inf + n_ini_recs + n_ini_vacs,
                                          replace=False)

        ini_infs = infs_recs_vacs[:n_ini_inf]
        ini_recs = infs_recs_vacs[n_ini_inf:n_ini_inf + n_ini_recs]
        ini_vacs = infs_recs_vacs[n_ini_inf + n_ini_recs:]

        put_inits_in_respective_group()

    def simulate_group(group: Group):
        if group.name == "Infected":
            for member in group:
                if member.make_tick("infectious"):
                    n_inner, n_outer = np.random.poisson(c_inner), np.random.poisson(c_outer)

                    if self.arrange_lockdown:
                        n_inner, n_outer = np.random.poisson(3/2 * c_inner), np.random.poisson(1/2 * c_outer)

                    if member.quarantined:
                        n_inner, n_outer = np.random.poisson(2 * c_inner), 0

                    gen_params = lambda: {
                        "infection_probability_heuristic": infection_heuristic,
                        "vaccine_failure_probability_heuristic": vaccine_heuristic,
                        "incubation_period": np.random.poisson(c_incubation),
                        "infection_period": np.random.poisson(c_infection),
                        "immunity_period": np.random.poisson(c_immunity),
                    }

                    household = self.population.households[member.properties["household"]]
                    new_members["newly_infected"] += household.spread_disease(member, n_inner, tick, gen_params())
                    new_members["newly_infected"] += self.population.spread_disease(member, n_outer, tick,
                                                                                    gen_params())

                    if member.make_tick("recover"):
                        new_members["newly_recovered"] += [member]
                        member.recovered = True

                    elif np.random.uniform() < mortality_heuristic(member.properties):
                        new_members["new_dead"] += [member]
                        member.make_dead(tick)

                if member.vaccinated:
                    member.make_tick("vaccine")

            new_members["newly_infected_vac"] = [m for m in new_members["newly_infected"] if m.vaccinated]

        elif group.name == "Recovered":
            for member in group:
                if member.make_tick("immunity"):
                    new_members["newly_susceptible_rec"] += [member]
                    new_members["newly_susceptible"] += [member]

        elif group.name == "Vaccinated":
            for member in group:
                if member.make_tick("immunity"):
                    new_members["newly_susceptible_vac"] += [member]
                    new_members["newly_susceptible"] += [member]

                elif not member.infected:
                    member.make_tick("vaccine")

        elif group.name == "Dead":
            pass

        elif group.name == "Quarantined":
            for member in group:
                if member.make_tick("quarantine", tick):
                    group.remove_member(member)

        else:
            raise ValueError("Group '" + group.name + "' does not have an update function")

    def move_members_to_new_groups():
        for member in new_members["newly_susceptible_rec"]:
            self.groups["Recovered"].remove_member(member)
            member.recovered = False

        for member in new_members["newly_susceptible_vac"]:
            self.groups["Vaccinated"].remove_member(member)
            member.vaccinated = False

        Group.move(new_members["newly_recovered"], self.groups["Infected"], self.groups["Recovered"])

        Group.move(new_members["new_dead"], self.groups["Infected"], self.groups["Dead"])

        for member in new_members["new_dead"]:
            self.groups["Quarantined"].remove_member(member)

        for member in new_members["newly_infected"]:
            self.groups["Infected"].add_member(member)
            if member.vaccinated:
                self.groups["Vaccinated"].remove_member(member)
                member.vaccinated = False

    def simulate_vaccinations():
        for member in new_members["staged_vaccinated"]:
            gen_params = lambda: {
                "t_vac_effect": np.random.poisson(c_vac_effect),
                "t_immunity": np.random.poisson(c_vac_immunity),
                "t_wait_vac": t_wait_vac,
                "t_wait_rec": t_wait_rec
            }

            if member.vaccinate(tick, gen_params()):
                if member.recovered:
                    Group.move([member], self.groups["Recovered"], self.groups["Vaccinated"])
                    member.recovered = False

                elif not member.vaccinated:
                    self.groups["Vaccinated"].add_member(member)

                member.vaccinated = True

            else:
                new_members["not_vaccinated"] += [member]

    def simulate_tests():
        def test_and_quarantine_procedure(member):
            result = False
            if not member.quarantined and tick != member._last_tested:
                if (not self.settings["test_vaccinated"]) and "vaccinations" in member.properties.keys() and \
                        tick < member.properties["vaccinations"][-1][0] + self.settings["vaccination_immunity_time"]:
                    return False

                result = member.test(tick)
                results[result] += 1
                if result:
                    member.quarantine(self.settings["quarantine_duration"])
                    self.groups["Quarantined"].add_member(member)

            return result

        def backtrack(member, depth):
            if depth <= 0 or np.random.uniform() > self.settings["backtracking_probability"]:
                return

            for contact in member.recent_contacts:
                if test_and_quarantine_procedure(contact):
                    backtrack(contact, depth - 1)

        n_tests = min(np.random.poisson(c_tests), self.population.size)
        results = [0, 0]
        for member in np.random.choice(self.population.members, size=n_tests, replace=False):
            if test_and_quarantine_procedure(member):
                backtrack(member, self.settings["backtracking_depth"])

        self.stats["test_results_-"] += [results[0]]
        self.stats["test_results_+"] += [results[1]]

    def decide_measure(measure: str):
        if measure == "lockdown":
            # Would a lockdown be arranged/prolonged?
            if self.settings["start_lockdown_at"] <= self.stats["seven_day_incidence"][-1]:
                result = True

            elif self.settings["end_lockdown_at"] >= self.stats["seven_day_incidence"][-1]:
                result = False

            else:
                result = self.arrange_lockdown

            # If it is (not) prolonged, is it still in the allowed duration interval?
            if result:
                if self.lockdown_duration >= self.settings["maximum_lockdown_duration"]:
                    self.lockdown_duration = 0
                    self.lockdown_ended = tick
                    result = False

            else:
                if self.arrange_lockdown and self.lockdown_duration < self.settings["minimum_lockdown_duration"]:
                    result = True

                else:
                    if self.arrange_lockdown:
                        self.lockdown_ended = tick

                    self.lockdown_duration = 0

            # If a new lockdown is arranged, is it sufficiently spaced from the last?
            if result and not self.arrange_lockdown:
                result = tick - self.lockdown_ended > self.settings["lockdown_gap"]

            # Are enough people vaccinated?
            if self.groups["Vaccinated"].size >= self.settings["heard_immunity"] * self.population.size:
                result = self.arrange_lockdown

            self.lockdown_duration += result

            return result

        else:
            raise ValueError("Measure not available")

    def update_stats():
        def calc_7di():
            positive = self.stats["test_results_+"]
            if len(positive) >= 7:
                return round(sum(positive[-7:]) * 100000 / self.population.size)
            else:
                return round(sum(positive) * 7 / len(positive) * 100000 / self.population.size)

        self.stats["new_infected"] += [len(new_members["newly_infected"])]
        self.stats["new_recovered"] += [len(new_members["newly_recovered"])]
        self.stats["new_susceptible"] += [len(new_members["newly_susceptible"])]
        self.stats["new_vaccinated"] += [n_vacs - len(new_members["not_vaccinated"])]
        self.stats["new_dead"] += [len(new_members["new_dead"])]
        self.stats["ill_vaccinated"] += [len(new_members["newly_infected_vac"])]
        self.stats["seven_day_incidence"] += [calc_7di()]
        self.stats["in_lockdown"] += [1 if self.arrange_lockdown else 0]

    def print_stats():
        color = bcolors.FAIL if self.arrange_lockdown else bcolors.OKGREEN
        print(color + "\rDay: %04d, #Infected: %d, #Vaccinated: %d, #Quarantined: %d, #Dead: %d, 7di: %d\033[K"
              % (tick,
                 self.groups["Infected"].size,
                 self.groups["Vaccinated"].size,
                 self.groups["Quarantined"].size,
                 self.groups["Dead"].size,
                 self.stats["seven_day_incidence"][-1]), end="")

    print("\nInitializing simulation...")

    # c -> put into poisson, n -> fixed value
    tick = 0
    infection_heuristic = self.settings["infection_probability_heuristic"]
    mortality_heuristic = self.settings["mortality_probability_heuristic"]
    vaccine_heuristic = self.settings["vaccine_failure_probability_heuristic"]
    c_inner = self.settings["inner_reproduction_number"]
    c_outer = self.settings["outer_reproduction_number"]
    n_ini_inf = self.settings["number_of_initially_infected"]
    n_ini_recs = self.settings["number_of_initially_recovered"]
    n_ini_vacs = self.settings["number_of_initially_vaccinated"]
    c_incubation = self.settings["incubation_time"]
    c_infection = self.settings["infection_time"]
    c_immunity = self.settings["recovered_immunity_time"]
    c_vac_effect = self.settings["vaccination_takes_effect_time"]
    c_vac_immunity = self.settings["vaccination_immunity_time"]
    c_vacs = self.settings["vaccinations_per_day"]
    c_tests = self.settings["tests_per_day"]
    t_wait_vac = self.settings["waiting_time_vaccinated_until_new_vaccination"]
    t_wait_rec = self.settings["waiting_time_recovered_until_vaccination"]
    t_vac_available = self.settings["vaccine_available_as_of"]
    max_t = self.settings["maximum_simulation_time_interval"]

    initialize_groups()

    print("Finished initializing simulation.\n\nStarting simulation...")

    print_stats()

    while True:
        tick += 1
        n_vacs = min(np.random.poisson(c_vacs), self.population.size)
        n_vacs = round(n_vacs * np.exp(-100 / (tick - t_vac_available)) if tick > t_vac_available else 0)

        self.arrange_lockdown = decide_measure("lockdown")

        new_members = {
            "newly_susceptible": [],
            "newly_infected": [],
            "newly_infected_vac": [],
            "newly_recovered": [],
            "newly_susceptible_rec": [],
            "newly_susceptible_vac": [],
            "staged_vaccinated": np.random.choice(self.population.members, size=n_vacs, replace=False),
            "not_vaccinated": [],
            "new_dead": []
        }

        for group in self.groups.values():
            simulate_group(group)

        move_members_to_new_groups()
        simulate_vaccinations()
        simulate_tests()

        for group in self.groups.values():
            group.counter.save_count()

        update_stats()
        print_stats()

        if self.groups["Infected"].size == 0 or tick >= max_t:
            break

    print(bcolors.ENDC + "\nFinished simulation.")