Source code for cdcam.model

"""Cambridge Communications Assessment Model
"""
from collections import defaultdict
from itertools import tee

[docs]class NetworkManager(object): """Model controller class. Represents lower level statistical units (postcode sectors) nested within upper level statistical units (local area districts), with all affiliated assets, capacities and clutter types. Parameters ---------- lads: :obj:`list` of :obj:`dict` List of local area districts * id: :obj:`int` Unique ID * name: :obj:`str` Name of the LAD pcd_sectors: :obj:`list` of :obj:`dict` List of postcode sectors (pcd) * id: :obj:`str` Postcode name * lad_id: :obj:`int` Unique ID * population: :obj:`int` Number of inhabitants * area: :obj:`float` Areas size in square kilometers (km²) * user_throughput: :obj:`int` Per user monthly data demand in gigabytes (GB) assets: :obj:`list` of :obj:`dict` List of assets * pcd_sector: :obj:`str` Code of the postcode sector * site_ngr: :obj:`int` Unique site reference number * technology: :obj:`str` Abbreviation of the asset technology (LTE, 5G etc.) * frequency: :obj:`str` Spectral frequency(s) the asset operates at (800, 2600, ..) * type: :obj:`str` The type of cell site (macrocell site, small cell site...) * build_date: :obj:`int` Build year of the asset capacity_lookup_table: dict Dictionary that represents the clutter/asset type, spectrum frequency and channel bandwidth, and the consequential cellular capacity provided for different asset densities. * key: :obj:`tuple` * 0: :obj:`str` Area type ('urban', 'suburban' or 'rural') or asset type ('small_cells') * 1: :obj:`str` Frequency of the asset configuration (800, 2600, ..) * 2: :obj:`str` Bandwith of the asset configuration (10, 40, ..) * 3: :obj:`str` Technology generation (4G, 5G) * value: :obj:`list` of :obj:`tuple` * 0: :obj:`int` Site asset density per square kilometer (sites per km²) * 1: :obj:`int` Mean Cell Edge capacity in Mbps per square kilometer (Mbps/km²) clutter_lookup: list of tuples Each element represents the settlement definitions for urban, suburban and rural by population density in square kilometers (persons per km²) * 0: :obj:`int` Population density in persons per km². * 1: :obj:`string` Settlement type (urban, suburban and rural) simulation_parameters: dict Contains all simulation parameters, set in the run script. * market_share: :obj:`int` Percentage market share of the modelled hypothetical operator. * annual_budget: :obj:`int` Annual budget to spend. * service_obligation_capacity: :obj:`int` Required service obligation. * busy_hour_traffic_percentage: :obj:`int` Percentage of daily traffic taking place in the busy hour. * coverage_threshold: :obj:`int` The threshold we wish to measure the served population against. * penetration: :obj:`int` The penetration of users with smartphone and data access. * channel_bandwidth: :obj:`int` Carrier bandwidth by frequency. * macro_sectors: :obj:`int` Number of sectors per macrocell. * small-cell_sectors: :obj:`int` Number of sectors per small cell. * mast_height: :obj:`int` Mast height for the sites being assessed. """ def __init__(self, lads, pcd_sectors, assets, capacity_lookup_table, clutter_lookup, simulation_parameters): self.lads = {} self.postcode_sectors = {} for lad_data in lads: lad_id = lad_data["id"] self.lads[lad_id] = LAD(lad_data, simulation_parameters) assets_by_pcd = defaultdict(list) for asset in assets: assets_by_pcd[asset['pcd_sector']].append(asset) for pcd_sector_data in pcd_sectors: try: lad_id = pcd_sector_data["lad_id"] pcd_sector_id = pcd_sector_data["id"] assets = assets_by_pcd[pcd_sector_id] pcd_sector = PostcodeSector(pcd_sector_data, assets, capacity_lookup_table, clutter_lookup, simulation_parameters) self.postcode_sectors[pcd_sector_id] = pcd_sector lad_containing_pcd_sector = self.lads[lad_id] lad_containing_pcd_sector.add_pcd_sector(pcd_sector) except: print('could not create object for {}'.format(pcd_sector_data["id"])) pass
[docs]class LAD(object): """Local area district - Higher level statistical unit. Represents an area to be modelled. Contains data for demand characterisation and assets for supply assessment. Arguments --------- data: dict Metadata and info for the LAD * id: :obj:`int` Unique ID * name: :obj:`str` Name of the LAD simulation_parameters: dict Contains all simulation parameters, set in the run script. * market_share: :obj:`int` Percentage market share of the modelled hypothetical operator. * annual_budget: :obj:`int` Annual budget to spend. * service_obligation_capacity: :obj:`int` Required service obligation. * busy_hour_traffic_percentage: :obj:`int` Percentage of daily traffic taking place in the busy hour. * coverage_threshold: :obj:`int` The threshold we wish to measure the served population against. * penetration: :obj:`int` The penetration of users with smartphone and data access. * channel_bandwidth: :obj:`int` Carrier bandwidth by frequency. * macro_sectors: :obj:`int` Number of sectors per macrocell. * small-cell_sectors: :obj:`int` Number of sectors per small cell. * mast_height: :obj:`int` Mast height for the sites being assessed. """ def __init__(self, data, simulation_parameters): self.id = data["id"] self.name = data["name"] self._pcd_sectors = {} def __repr__(self): return "<LAD id:{} name:{}>".format(self.id, self.name) @property def population(self): return sum([ pcd_sector.population for pcd_sector in self._pcd_sectors.values()]) @property def area(self): return sum([ pcd_sector.area for pcd_sector in self._pcd_sectors.values()]) @property def population_density(self): total_area = sum([ pcd_sector.area for pcd_sector in self._pcd_sectors.values()]) if total_area == 0: return 0 else: return self.population / total_area
[docs] def add_pcd_sector(self, pcd_sector): self._pcd_sectors[pcd_sector.id] = pcd_sector
[docs] def demand(self): """Return the mean demand (Mbps km²) from all nested postcode sectors. """ if not self._pcd_sectors: return 0 summed_demand = sum( pcd_sector.demand * pcd_sector.area for pcd_sector in self._pcd_sectors.values() ) summed_area = sum( pcd_sector.area for pcd_sector in self._pcd_sectors.values() ) return summed_demand / summed_area
[docs] def capacity(self): """Return the mean capacity (Mbps km²) for all nested postcode sectors. """ if not self._pcd_sectors: return 0 summed_capacity = sum([ pcd_sector.capacity for pcd_sector in self._pcd_sectors.values()]) return summed_capacity / len(self._pcd_sectors)
[docs] def coverage(self, simulation_parameters): """Return proportion of population with coverage over a threshold (e.g. 10 Mbps). """ if not self._pcd_sectors: return 0 threshold = simulation_parameters['coverage_threshold'] population_with_coverage = sum([ pcd_sector.population for pcd_sector in self._pcd_sectors.values() if pcd_sector.capacity >= threshold]) total_pop = sum([ pcd_sector.population for pcd_sector in self._pcd_sectors.values()]) return float(population_with_coverage) / total_pop
[docs]class PostcodeSector(object): """Postcode Sector - Lower level statistical unit. Represents an area to be modelled. Contains data for demand characterisation and assets for supply assessment. Arguments --------- data: dict Metadata and info for the LAD * id: :obj:`int` Unique ID. * lad_id: :obj:`int` The Local Authority District which this area is within. * population: :obj:`int` Number of inhabitants. * area: :obj:`int` Geographic area (km²). * user_throughput: :obj:`int` Monthly user data consumption (GB). * population: :obj:`int` Number of inhabitants. * area: :obj:`int` Geographic area (km²). assets: :obj:`list` of :obj:`dict` List of assets * pcd_sector: :obj:`str` Code of the postcode sector * site_ngr: :obj:`int` Unique site reference number * technology: :obj:`str` Abbreviation of the asset technology (LTE, 5G etc.) * frequency: :obj:`str` Spectral frequency(s) the asset operates at (800, 2600, ..) * type: :obj:`str` The type of cell site (macrocell site, small cell site...) * build_date: :obj:`int` Build year of the asset capacity_lookup_table: dict Dictionary that represents the clutter/asset type, spectrum frequency and channel bandwidth, and the consequential cellular capacity provided for different asset densities. * key: :obj:`tuple` * 0: :obj:`str` Area type ('urban', 'suburban' or 'rural') or asset type ('small_cells') * 1: :obj:`str` Frequency of the asset configuration (800, 2600, ..) * 2: :obj:`str` Bandwith of the asset configuration (10, 40, ..) * 3: :obj:`str` Technology generation (4G, 5G) * value: :obj:`list` of :obj:`tuple` * 0: :obj:`int` Site asset density per square kilometer (sites per km²) * 1: :obj:`int` Mean Cell Edge capacity in Mbps per square kilometer (Mbps/km²) clutter_lookup: list of tuples Each element represents the settlement definitions for urban, suburban and rural by population density in square kilometers (persons per km²) * 0: :obj:`int` Population density in persons per km². * 1: :obj:`string` Settlement type (urban, suburban and rural) simulation_parameters: dict Contains all simulation parameters, set in the run script. * market_share: :obj:`int` Percentage market share of the modelled hypothetical operator. * busy_hour_traffic_percentage: :obj:`int` Percentage of daily traffic taking place in the busy hour. * penetration: :obj:`int` The penetration of users with smartphone and data access. """ def __init__(self, data, assets, capacity_lookup_table, clutter_lookup, simulation_parameters): self.id = data["id"] self.lad_id = data["lad_id"] self.population = data["population"] self.area = data["area_km2"] self.user_throughput = data["user_throughput"] self.penetration = simulation_parameters['penetration'] self.busy_hour_traffic = simulation_parameters['busy_hour_traffic_percentage'] self.overbooking_factor = simulation_parameters['overbooking_factor'] self.market_share = simulation_parameters['market_share'] self.user_demand = self._calculate_user_demand( self.user_throughput, simulation_parameters) self.demand_density = self.demand / self.area self._capacity_lookup_table = capacity_lookup_table self._clutter_lookup = clutter_lookup self.clutter_environment = lookup_clutter_geotype( self._clutter_lookup, self.population_density ) self.assets = assets self.site_density_macrocells = self._calculate_site_density_macrocells() self.site_density_small_cells = self._calculate_site_density_small_cells() self.capacity = ( self._macrocell_site_capacity(simulation_parameters) + self.small_cell_capacity(simulation_parameters) ) def __repr__(self): return "<PostcodeSector id:{}>".format(self.id) @property def demand(self): """ Estimate total demand based on: - population - overbooking factor - smartphone penetration - market share - user demand - area E.g.:: 2000 population / 20 * (80% / 100) penetration * (25% / 100) market share = 20 users 20 users * 0.01 Mbps user demand = 0.2 total user throughput 0.2 Mbps total user throughput during the busy hour / 1 km² area = 0.2 Mbps/km² area demand """ users = ( (self.population / self.overbooking_factor) * (self.penetration / 100) * self.market_share ) user_throughput = users * self.user_demand demand_per_kmsq = user_throughput / self.area return demand_per_kmsq @property def population_density(self): """ Calculate population density for a specific population and area (persons per km²). """ return self.population / self.area def _calculate_site_density_macrocells(self): """ Calculate the macrocell site density (sites per km²). """ unique_sites = set() for asset in self.assets: if asset['type'] == 'macrocell_site': unique_sites.add(asset['site_ngr']) site_density = float(len(unique_sites)) / self.area return site_density def _calculate_site_density_small_cells(self): """ Calculate the small cell site density (sites per km²). """ small_cells = [] for asset in self.assets: if asset['type'] == 'small_cell': small_cells.append(asset) site_density = float(len(small_cells)) / self.area return site_density def _calculate_user_demand(self, user_throughput, simulation_parameters): """ Calculate Mb/second from GB/month supplied by throughput scenario. E.g. 2 GB per month * 1024 to find MB * 8 to covert bytes to bits * busy_hour_traffic = daily traffic taking place in the busy hour * 1/30 assuming 30 days per month * 1/3600 converting hours to seconds, = ~0.01 Mbps required per user """ busy_hour_traffic = simulation_parameters['busy_hour_traffic_percentage'] demand = user_throughput * 1024 * 8 * (busy_hour_traffic / 100) / 30 / 3600 if demand < 5: demand = 5 return demand def _macrocell_site_capacity(self, simulation_parameters): """ Find the macrocellular Radio Access Network capacity given the area assets and deployed frequency bands. """ capacity = 0 for frequency in [ '700', '800', '2600', '3500', '26000' ]: unique_sites = set() for asset in self.assets: for asset_frequency in asset['frequency']: if asset_frequency == frequency: unique_sites.add(asset['site_ngr']) site_density = float(len(unique_sites)) / self.area bandwidth = find_frequency_bandwidth(frequency, simulation_parameters) if frequency == '700' or frequency == '3500' or frequency == '26000': generation = '5G' else: generation = '4G' if site_density > 0: tech_capacity = lookup_capacity( self._capacity_lookup_table, self.clutter_environment, 'macro', frequency, bandwidth, generation, site_density, ) else: tech_capacity = 0 capacity += tech_capacity return capacity
[docs] def small_cell_capacity(self, simulation_parameters): """ Find the small cell Radio Access Network capacity given the area assets and deployed frequency bands. """ capacity = 0 for frequency in ['3700', '26000']: num_small_cells = len([ asset for asset in self.assets if asset['type'] == "small_cell" ]) site_density = float(num_small_cells) / self.area bandwidth = find_frequency_bandwidth(frequency, simulation_parameters) if site_density > 0 : tech_capacity = lookup_capacity( self._capacity_lookup_table, self.clutter_environment, "micro", frequency, bandwidth, "5G", site_density, ) else: tech_capacity = 0 capacity += tech_capacity return capacity
[docs]def find_frequency_bandwidth(frequency, simulation_parameters): """ Finds the correct bandwidth for a specific frequency from the simulation parameters. """ simulation_parameter = 'channel_bandwidth_{}'.format(frequency) if simulation_parameter not in simulation_parameters.keys(): KeyError('{} not specified in simulation_parameters'.format(frequency)) bandwidth = simulation_parameters[simulation_parameter] return bandwidth
[docs]def pairwise(iterable): """ Return iterable of 2-tuples in a sliding window. >>> list(pairwise([1,2,3,4])) [(1,2),(2,3),(3,4)] """ a, b = tee(iterable) next(b, None) return zip(a, b)
[docs]def lookup_clutter_geotype(clutter_lookup, population_density): """ Return geotype based on population density Parameters ---------- clutter_lookup : list A list of tuples sorted by population_density_upper_bound ascending (population_density_upper_bound, geotype). population_density : float The current population density requiring the lookup. """ highest_popd, highest_geotype = clutter_lookup[2] middle_popd, middle_geotype = clutter_lookup[1] lowest_popd, lowest_geotype = clutter_lookup[0] if population_density < middle_popd: return lowest_geotype elif population_density > highest_popd: return highest_geotype else: return middle_geotype
[docs]def lookup_capacity(lookup_table, environment, cell_type, frequency, bandwidth, generation, site_density): """ Use lookup table to find capacity by clutter environment geotype, frequency, bandwidth, technology generation and site density. """ if (environment, cell_type, frequency, bandwidth, generation) not in lookup_table: raise KeyError("Combination %s not found in lookup table", (environment, cell_type, frequency, bandwidth, generation)) density_capacities = lookup_table[ (environment, cell_type, frequency, bandwidth, generation) ] lowest_density, lowest_capacity = density_capacities[0] if site_density < lowest_density: return 0 for a, b in pairwise(density_capacities): lower_density, lower_capacity = a upper_density, upper_capacity = b if lower_density <= site_density and site_density < upper_density: result = interpolate( lower_density, lower_capacity, upper_density, upper_capacity, site_density ) return result # If not caught between bounds return highest capacity highest_density, highest_capacity = density_capacities[-1] return highest_capacity
[docs]def interpolate(x0, y0, x1, y1, x): """ Linear interpolation between two values. """ y = (y0 * (x1 - x) + y1 * (x - x0)) / (x1 - x0) return y