286 lines
8.6 KiB
Python
286 lines
8.6 KiB
Python
# pyRCV2: Preferential vote counting
|
|
# Copyright © 2020–2021 Lee Yingtong Li (RunasSudo)
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
__pragma__ = lambda x: None
|
|
|
|
from pyRCV2.exceptions import BaseRCVException
|
|
from pyRCV2.model import CandidateState
|
|
from pyRCV2.safedict import SafeDict
|
|
|
|
class ConstraintException(BaseRCVException):
|
|
pass
|
|
|
|
class ConstraintMatrix:
|
|
"""
|
|
Represents a constraint matrix/table, as described by Otten
|
|
"""
|
|
|
|
def __init__(self, dimensions):
|
|
self.dimensions = dimensions
|
|
|
|
num_cells = 1
|
|
for d in dimensions:
|
|
num_cells *= (d + 1)
|
|
|
|
# FIXME: This is not the most efficient packing, as we require margins only along one dimension at a time
|
|
self.matrix = [ConstraintMatrixCell() for _ in range(num_cells)]
|
|
|
|
# Pass -1 to get margins
|
|
def index_of(self, address):
|
|
index = 0
|
|
for i, d in enumerate(self.dimensions):
|
|
index *= (d + 1)
|
|
index += (address[i] + 1)
|
|
return index
|
|
|
|
def get(self, address):
|
|
return self.matrix[self.index_of(address)]
|
|
|
|
class ConstraintMatrixCell:
|
|
def __init__(self):
|
|
self.elected = 0
|
|
self.min = 0
|
|
self.max = 0
|
|
self.cands = 0
|
|
|
|
def __repr__(self):
|
|
return '<ConstraintMatrixCell: E={}, Min={}, Max={}, C={}>'.format(self.elected, self.min, self.max, self.cands)
|
|
|
|
def ndrange(dimensions):
|
|
# n-dimensional range function
|
|
if len(dimensions) == 0:
|
|
yield []
|
|
return
|
|
|
|
if len(dimensions) == 1:
|
|
yield from ([n] for n in range(dimensions[0]))
|
|
return
|
|
|
|
__pragma__('opov')
|
|
yield from ([n] + j for n in range(dimensions[0]) for j in ndrange(dimensions[1:]))
|
|
__pragma__('noopov')
|
|
|
|
def init_matrix(counter):
|
|
"""Initialise matrix with initial constraints"""
|
|
cm = ConstraintMatrix([len(category) for category_name, category in counter.election.constraints.items()])
|
|
counter._constraint_matrix = cm
|
|
update_matrix(counter)
|
|
|
|
# Add min/max to margins
|
|
for i, x in enumerate(counter.election.constraints.items()):
|
|
category_name, category = x
|
|
for j, y in enumerate(category.items()):
|
|
group_name, constraint = y
|
|
|
|
address = [-1 for _ in range(len(cm.dimensions))]
|
|
address[i] = j
|
|
|
|
cell = cm.get(address)
|
|
cell.min = constraint.min
|
|
cell.max = constraint.max
|
|
|
|
def candidate_to_address(constraints, candidate):
|
|
address = []
|
|
for category_name, category in constraints.items():
|
|
for i, x in enumerate(category.items()):
|
|
group_name, constraint = x
|
|
if candidate in constraint.candidates:
|
|
address.append(i)
|
|
return address
|
|
|
|
def update_matrix(counter):
|
|
"""Update matrix with elected/hopeful numbers"""
|
|
cm = counter._constraint_matrix
|
|
|
|
# Reset elected/cands
|
|
for address in ndrange(cm.dimensions):
|
|
cell = cm.get(address)
|
|
cell.elected = 0
|
|
cell.cands = 0
|
|
|
|
# Update grand total cell as well
|
|
cell_gt = cm.get([-1 for _ in range(len(cm.dimensions))])
|
|
cell_gt.elected = 0
|
|
cell_gt.cands = 0
|
|
|
|
# Count elected/cands
|
|
for candidate, count_card in counter.candidates.items():
|
|
if count_card.state in (CandidateState.HOPEFUL, CandidateState.GUARDED, CandidateState.PROVISIONALLY_ELECTED, CandidateState.DISTRIBUTING_SURPLUS, CandidateState.ELECTED):
|
|
address = candidate_to_address(counter.election.constraints, candidate)
|
|
cm.get(address).cands += 1
|
|
cell_gt.cands += 1
|
|
|
|
if len(cm.dimensions) > 1:
|
|
# If only 1 dimension, this is equivalent to cell_gt
|
|
for dimension in range(len(cm.dimensions)):
|
|
addr2 = [x for x in address]
|
|
addr2[dimension] = -1
|
|
cm.get(addr2).cands += 1
|
|
|
|
if count_card.state in (CandidateState.PROVISIONALLY_ELECTED, CandidateState.DISTRIBUTING_SURPLUS, CandidateState.ELECTED):
|
|
address = candidate_to_address(counter.election.constraints, candidate)
|
|
cm.get(address).elected += 1
|
|
cell_gt.elected += 1
|
|
|
|
if len(cm.dimensions) > 1:
|
|
# If only 1 dimension, this is equivalent to cell_gt
|
|
for dimension in range(len(cm.dimensions)):
|
|
addr2 = [x for x in address]
|
|
addr2[dimension] = -1
|
|
cm.get(addr2).elected += 1
|
|
|
|
cell_gt.max = cell_gt.cands
|
|
|
|
def step_matrix(counter):
|
|
"""
|
|
Adjust the matrix one step towards stability
|
|
Return True if stable, or False if an adjustment was required
|
|
"""
|
|
|
|
cm = counter._constraint_matrix
|
|
|
|
# Rule 1
|
|
for cell in cm.matrix:
|
|
if cell.elected > cell.min:
|
|
cell.min = cell.elected
|
|
return False
|
|
if cell.cands < cell.max:
|
|
cell.max = cell.cands
|
|
return False
|
|
if cell.min > cell.max:
|
|
raise ConstraintException('No result conformant with the constraints is possible')
|
|
|
|
# Rule 2/3
|
|
for address in ndrange(cm.dimensions):
|
|
cell = cm.get(address)
|
|
|
|
for dimension1 in range(len(cm.dimensions)): # Which margin total
|
|
for dimension2 in range(len(cm.dimensions)): # Which axis to check along
|
|
if dimension1 == dimension2:
|
|
continue
|
|
|
|
tot_min_others = 0
|
|
tot_max_others = 0
|
|
|
|
addr2 = [x for x in address]
|
|
for i in range(cm.dimensions[dimension2]):
|
|
if i == address[dimension2]:
|
|
continue
|
|
addr2[dimension2] = i
|
|
|
|
cell2 = cm.get(addr2)
|
|
tot_min_others += cell2.min
|
|
tot_max_others += cell2.max
|
|
|
|
#addr2[dimension] = -1
|
|
addr2 = [-1 for _ in range(len(cm.dimensions))]
|
|
addr2[dimension1] = address[dimension1]
|
|
cell_dimension = cm.get(addr2)
|
|
min_dimension = cell_dimension.min
|
|
max_dimension = cell_dimension.max
|
|
|
|
# This many must be elected from this cell at least
|
|
this_cell_min = min_dimension - tot_max_others
|
|
this_cell_max = max_dimension - tot_min_others
|
|
|
|
# Rule 2
|
|
if this_cell_min > cell.min:
|
|
cell.min = this_cell_min
|
|
return False
|
|
|
|
# Rule 3
|
|
if this_cell_max < cell.max:
|
|
cell.max = this_cell_max
|
|
return False
|
|
|
|
# Rule 4/5
|
|
for dimension in range(len(cm.dimensions)):
|
|
for n in range(cm.dimensions[dimension]):
|
|
tot_min = 0
|
|
tot_max = 0
|
|
|
|
for addr1 in ndrange(cm.dimensions[:dimension]):
|
|
for addr2 in ndrange(cm.dimensions[dimension+1:]):
|
|
__pragma__('opov')
|
|
address = addr1 + [n] + addr2
|
|
__pragma__('noopov')
|
|
tot_min += cm.get(address).min
|
|
tot_max += cm.get(address).max
|
|
|
|
address = [-1 for _ in range(len(cm.dimensions))]
|
|
address[dimension] = n
|
|
cell = cm.get(address)
|
|
|
|
# Rule 4
|
|
if cell.min < tot_min:
|
|
cell.min = tot_min
|
|
|
|
# Rule 5
|
|
if cell.max > tot_max:
|
|
cell.max = tot_max
|
|
|
|
return True
|
|
|
|
def stabilise_matrix(counter):
|
|
"""Run step_matrix until entering a stable state"""
|
|
while True:
|
|
result = step_matrix(counter)
|
|
if result == True:
|
|
return
|
|
|
|
def guard_or_doom(counter):
|
|
"""Guard or doom candidates as required"""
|
|
cm = counter._constraint_matrix
|
|
|
|
logs = []
|
|
|
|
for address in ndrange(cm.dimensions):
|
|
cell = cm.get(address)
|
|
|
|
if cell.elected == cell.max:
|
|
# Doom remaining candidates in this group
|
|
cands_doomed = SafeDict([(c, cc) for c, cc in counter.candidates.items() if cc.state == CandidateState.HOPEFUL])
|
|
for i, x in enumerate(counter.election.constraints.items()):
|
|
category_name, category = x
|
|
for candidate, count_card in list(cands_doomed.items()):
|
|
if candidate not in category[list(category.keys())[address[i]]].candidates:
|
|
#del cands_doomed[candidate] # Transcrypt NYI
|
|
cands_doomed.__delitem__(candidate)
|
|
|
|
for candidate, count_card in cands_doomed.items():
|
|
count_card.state = CandidateState.DOOMED
|
|
|
|
if len(cands_doomed) > 0:
|
|
logs.append(counter.pretty_join([c.name for c, cc in cands_doomed.items()]) + ' must be doomed to comply with constraints.')
|
|
|
|
if cell.cands == cell.min:
|
|
# Guard remaining candidates in this group
|
|
cands_guarded = SafeDict([(c, cc) for c, cc in counter.candidates.items() if cc.state == CandidateState.HOPEFUL])
|
|
for i, x in enumerate(counter.election.constraints.items()):
|
|
category_name, category = x
|
|
for candidate, count_card in list(cands_guarded.items()):
|
|
if candidate not in category[list(category.keys())[address[i]]].candidates:
|
|
#del cands_guarded[candidate] # Transcrypt NYI
|
|
cands_guarded.__delitem__(candidate)
|
|
|
|
for candidate, count_card in cands_guarded.items():
|
|
count_card.state = CandidateState.GUARDED
|
|
|
|
if len(cands_guarded) > 0:
|
|
logs.append(counter.pretty_join([c.name for c, cc in cands_guarded.items()]) + ' must be guarded to comply with constraints.')
|
|
|
|
return logs
|