Skip to content
Snippets Groups Projects
Commit d2a1c453 authored by nguyed99's avatar nguyed99 Committed by jakut77
Browse files

Include BHT

parent 5fc24aaa
No related branches found
No related tags found
No related merge requests found
""" """
This module implements the Barnes Hut Tree Algorithm This module implements the Barnes Hut Tree (BHT) Algorithm for 2D data.
""" """
import numpy as np import numpy as np
...@@ -318,4 +318,4 @@ class TreeNode: ...@@ -318,4 +318,4 @@ class TreeNode:
self.mass = total_mass self.mass = total_mass
else: else:
self.center_of_mass = np.array([0.0, 0.0]) self.center_of_mass = np.array([0.0, 0.0])
self.mass = 0 self.mass = 0
\ No newline at end of file
"""
This module implements the Barnes Hut Tree (BHT) Algorithm for 3D data.
"""
import numpy as np
class MainApp:
def __init__(self, particles):
"""Initialize the MainApp with a root node."""
self.rootNode = TreeNode(x=0, y=0, z=0, width=200, height=200, depth=200)
def BuildTree(self, particles):
"""Build the Octree by inserting particles.
Args:
particles (list): A list of Particle objects to be inserted into the Octree.
"""
self.ResetTree(particles) # Empty the tree
for particle in particles:
self.rootNode.insert(particle)
def ResetTree(self, particles):
"""Reset the Quadtree by reinitializing the root node."""
# Define the size of the rootNode based on the positions of the particles
#min_x = min([particle.x for particle in particles])
#min_y = min([particle.y for particle in particles])
#min_z = min([particle.z for particle in particles])
#max_x = max([particle.x for particle in particles])
#max_y = max([particle.y for particle in particles])
#max_z = max([particle.z for particle in particles])
#root_height = max_y - min_y
#root_width = max_x - min_x
#root_depth = max_z - min_z
#self.rootNode = TreeNode(x=min_x, y=min_y, z=min_z, width=root_width, height=root_height, depth=root_depth)
self.rootNode = TreeNode(x=0, y=0, z=0, width=200, height=200, depth=200)
class Particle:
def __init__(self, x, y, z, mass):
"""Initialize a Particle with x, y, z coordinates and mass."""
self.x = x
self.y = y
self.z = z
self.mass = mass
self.vx = 0.0 # Velocity component in x direction
self.vy = 0.0 # Velocity component in y direction
self.vz = 0.0 # Velocity component in z direction
self.fx = 0.0 # Force component in x direction
self.fy = 0.0 # Force component in y direction
self.fz = 0.0 # Force component in z direction
class TreeNode:
def __init__(self, x, y, z, width, height, depth):
"""Initialize a TreeNode representing a octant in the Octree.
Args:
x (float): x-coordinate of the node.
y (float): y-coordinate of the node.
z (float): z-coordinate of the node.
width (float): Width of the node.
height (float): Height of the node.
depth (float): Depth of the node.
"""
self.x = x
self.y = y
self.z = z
self.width = width
self.height = height
self.depth = depth
self.particle = None # Particle contained in this node
self.center_of_mass = np.array([(x + width) / 2, (y + height) / 2, (z + depth) / 2])
self.total_mass = 0 # Total mass contained in the node
self.children = np.empty(8, dtype=object) # Children nodes (F-SW, F-SE, F-NW, F-NE, B-SW, B-SE, B-NW, B-NE,)
def contains(self, particle):
"""Check if the particle is within the bounds of this node.
Args:
particle (Particle): The particle to be checked.
Returns:
bool: True if the particle is within the node's bounds, False otherwise.
"""
return (self.x <= particle.x <= self.x + self.width and self.y <= particle.y <= self.y + self.height
and self.z <= particle.z <= self.z + self.depth)
def insert(self, particle):
"""Insert a particle into the Octree.
Args:
particle (Particle): The particle to be inserted.
Returns:
bool: True if the particle is inserted, False otherwise.
"""
if not self.contains(particle):
return False # Particle doesn't belong in this node
if self.particle is None and all(child is None for child in self.children):
# If the node is empty and has no children, insert particle here.
self.particle = particle
#print(f'particle inserted: x={round(self.particle.x,2)}, y={round(self.particle.y,2)}, , z={round(self.particle.z,2)}')
return True # Particle inserted in an empty node
if all(child is None for child in self.children):
# If no children exist, create and insert both particles
self.subdivide()
self.insert(self.particle) # Reinsert existing particle into subnode
self.insert(particle) # Insert new particle
self.particle = None # Clear particle from this node
else:
# If the node has children, insert particle in the child node.
oct_index = self.get_octant(particle)
if self.children[oct_index] is None:
print('Missing node:', self.children)
# Create a child node if it doesn't exist
self.children[oct_index] = TreeNode(self.x + (oct_index % 2) * (self.width / 2),
self.y + (oct_index // 2) * (self.height / 2),
self.z + (oct_index // 2) * (self.depth / 2), self.width / 2,
self.height / 2, self.depth / 2)
self.children[oct_index].insert(particle)
def subdivide(self):
"""Create the children of the node."""
sub_width = self.width / 2
sub_height = self.height / 2
sub_depth = self.depth / 2
self.children[0] = TreeNode(self.x, self.y, self.z, sub_width, sub_height, sub_depth) # B-SW
self.children[1] = TreeNode(self.x + sub_width, self.y, self.z, sub_width, sub_height, sub_depth) # B-SE
self.children[2] = TreeNode(self.x, self.y + sub_height, self.z, sub_width, sub_height, sub_depth) # B-NW
self.children[3] = TreeNode(self.x + sub_width, self.y + sub_height, self.z, sub_width, sub_height,
sub_depth) # B-NE
self.children[4] = TreeNode(self.x, self.y, self.z + sub_depth, sub_width, sub_height, sub_depth) # T-SW
self.children[5] = TreeNode(self.x + sub_width, self.y, self.z + sub_depth, sub_width, sub_height,
sub_depth) # T-SE
self.children[6] = TreeNode(self.x, self.y + sub_height, self.z + sub_depth, sub_width, sub_height,
sub_depth) # T-NW
self.children[7] = TreeNode(self.x + sub_width, self.y + sub_height, self.z + sub_depth, sub_width, sub_height,
sub_depth) # T-NE
def get_octant(self, particle):
"""Determine the octant index for a particle based on its position.
Args:
particle (Particle): The particle to determine the octant index for.
Returns:
int: Octant index
B - Bottom, T - Top
(0 for B-SW, 1 for B-SE, 2 for B-NW, 3 for B-NE,
4 for T-SW, 5 for T-SE, 6 for T-NW, 7 for T-NE).
"""
# Determine separating planes
mid_x = self.x + self.width / 2
mid_y = self.y + self.height / 2
mid_z = self.z + self.depth / 2
quad_index = (particle.x >= mid_x) + 2 * (particle.y >= mid_y) + 4 * (particle.z >= mid_z)
return quad_index
def print_tree(self, depth_=0):
"""Print the structure of the Octree.
Args:
depth_ (int): Current depth in the tree (for indentation in print).
"""
if self.particle:
print(
f"{' ' * depth_}Particle at ({round(self.particle.x,2)}, {round(self.particle.y,2)}, {round(self.particle.z,2)}) in Node ({round(self.x,2)}, {round(self.y,2)}, {round(self.z,2)}), width={round(self.width,2)}, height={round(self.height,2)}, depth={round(self.depth,2)}"
)
else:
print(
f"{' ' * depth_}Node ({round(self.x,2)}, {round(self.y,2)}, {round(self.z,2)}) - Width: {round(self.width,2)}, Height: {round(self.height,2)}, Depth: {round(self.depth,2)}"
)
for child in self.children:
if child:
child.print_tree(depth_ + 2)
def ComputeMassDistribution(self):
"""Compute the mass distribution for the tree nodes.
This function calculates the total mass and the center of mass
for each node in the Octree. It's a recursive function that
computes the mass distribution starting from the current node.
Note:
This method modifies the 'total_mass' and 'center_of_mass' attributes
for each node in the Octree.
Returns:
None
"""
if self.particle is not None:
# Node contains only one particle
self.center_of_mass = np.array([self.particle.x, self.particle.y, self.particle.z])
self.total_mass = self.particle.mass
else:
# Multiple particles in node
total_mass = 0
center_of_mass_accumulator = np.array([0.0, 0.0, 0.0])
for child in self.children:
if child is not None:
# Recursively compute mass distribution for child nodes
child.ComputeMassDistribution()
total_mass += child.total_mass
center_of_mass_accumulator += child.total_mass * child.center_of_mass
if total_mass > 0:
self.center_of_mass = center_of_mass_accumulator / total_mass
self.total_mass = total_mass
else:
# If total mass is 0 or no child nodes have mass, leave values as default
pass
#self.center_of_mass = np.array([(x+width)/2, (y+height)/2, (z+depth)/2])
#self.total_mass = 0
def CalculateForceFromTree(self, target_particle, theta=1.0):
"""Calculate the force on a target particle using the Barnes-Hut algorithm.
Args:
target_particle (Particle): The particle for which the force is calculated.
theta (float): The Barnes-Hut criterion for force approximation.
Returns:
np.ndarray: The total force acting on the target particle.
"""
total_force = np.array([0.0, 0.0, 0.0])
if self.particle is not None:
# Node contains only one particle
if self.particle != target_particle:
# Calculate gravitational force between target_particle and node's particle
force = self.GravitationalForce(target_particle, self.particle)
total_force += force
else:
if self.total_mass == 0:
return total_force
r = np.linalg.norm(
np.array([target_particle.x, target_particle.y, target_particle.z]) - self.center_of_mass)
d = max(self.width, self.height, self.depth)
if d / r < theta:
# Calculate gravitational force between target_particle and "node particle" representing cluster
force = self.GravitationalForce(
target_particle,
Particle(self.center_of_mass[0], self.center_of_mass[1], self.center_of_mass[2], self.total_mass))
total_force += force
else:
for child in self.children:
if child is not None:
# Recursively calculate force from child nodes
if target_particle is not None: # Check if the target_particle is not None
force = child.CalculateForceFromTree(target_particle)
total_force += force
return total_force
def CalculateForce(self, target_particle, particle, theta=1.0):
"""Calculate the gravitational force between two particles.
Args:
target_particle (Particle): The particle for which the force is calculated.
particle (Particle): The particle exerting the force.
Returns:
np.ndarray: The force vector acting on the target particle due to 'particle'.
"""
force = np.array([0.0, 0.0, 0.0])
print('function CalculateForce is called')
if self.particle is not None:
# Node contains only one particle
if self.particle != target_particle:
# Calculate gravitational force between target_particle and node's particle
force = self.GravitationalForce(target_particle, self.particle)
else:
if target_particle is not None and particle is not None: # Check if both particles are not None
r = np.linalg.norm(
np.array([target_particle.x, target_particle.y, target_particle.z]) -
np.array([particle.x, particle.y, particle.z]))
d = max(self.width, self.height, self.depth)
if d / r < theta:
# Calculate gravitational force between target_particle and particle
force = self.GravitationalForce(target_particle, particle)
else:
for child in self.children:
if child is not None:
# Recursively calculate force from child nodes
force += child.CalculateForce(target_particle, particle)
return force
def GravitationalForce(self, particle1, particle2):
"""Calculate the gravitational force between two particles.
Args:
particle1 (Particle): First particle.
particle2 (Particle): Second particle.
Returns:
np.ndarray: The gravitational force vector between particle1 and particle2.
"""
#G = 6.674 * (10 ** -11) # Gravitational constant
G = 1
dx = particle2.x - particle1.x
dy = particle2.y - particle1.y
dz = particle2.z - particle1.z
cutoff_radius = 5
r = max(np.sqrt(dx**2 + dy**2 + dz**2), cutoff_radius)
force_magnitude = G * particle1.mass * particle2.mass / (r**2)
force_x = force_magnitude * (dx / r)
force_y = force_magnitude * (dy / r)
force_z = force_magnitude * (dz / r)
return np.array([force_x, force_y, force_z])
# Helper method to retrieve all particles in the subtree
def particles_in_subtree(self):
"""Retrieve all particles in the subtree rooted at this node.
Returns:
list: A list of particles in the subtree rooted at this node.
"""
particles = []
if self.particle is not None:
particles.append(self.particle)
else:
for child in self.children:
if child is not None:
particles.extend(child.particles_in_subtree())
return len(particles), particles
def compute_center_of_mass(self):
"""Compute the center of mass for the node."""
print('Function compute_center_of_mass is called')
if self.particle is not None:
self.center_of_mass = np.array([self.particle.x, self.particle.y, self.particle.z])
self.mass = self.particle.mass
else:
total_mass = 0
center_of_mass_accumulator = np.array([0.0, 0.0, 0.0])
for child in self.children:
if child is not None:
child.compute_center_of_mass()
total_mass += child.mass
center_of_mass_accumulator += child.mass * child.center_of_mass
if total_mass > 0:
self.center_of_mass = center_of_mass_accumulator / total_mass
self.mass = total_mass
else:
self.center_of_mass = np.array([0.0, 0.0, 0.0])
self.mass = 0
\ No newline at end of file
""" """
This module contains base class for a n-body gravitational system This module contains base class for a n-body gravitational system.
with 2 methods of simulations: direct and approximated force field.
""" """
from collections.abc import Callable from collections.abc import Callable
...@@ -23,12 +22,12 @@ class GravitationalSystem: ...@@ -23,12 +22,12 @@ class GravitationalSystem:
assert self.r0.shape == self.v0.shape assert self.r0.shape == self.v0.shape
assert self.m.shape[0] == self.r0.shape[0] assert self.m.shape[0] == self.r0.shape[0]
solvers = ['verlet'] solvers = ['verlet', 'bht']
assert self.solver.__name__ in solvers assert self.solver.__name__ in solvers
def direct_simulation(self): def simulation(self):
""" """
Using integrator to compute trajector in phase space Using integrator to compute trajectory in phase space
""" """
p = np.zeros((len(self.t), *self.v0.shape)) p = np.zeros((len(self.t), *self.v0.shape))
q = np.zeros((len(self.t), *self.r0.shape)) q = np.zeros((len(self.t), *self.r0.shape))
...@@ -39,19 +38,4 @@ class GravitationalSystem: ...@@ -39,19 +38,4 @@ class GravitationalSystem:
dt = self.t[i] - self.t[i - 1] dt = self.t[i] - self.t[i - 1]
p[i], q[i] = self.solver(self.force, q[i - 1], p[i - 1], self.m, dt) p[i], q[i] = self.solver(self.force, q[i - 1], p[i - 1], self.m, dt)
return self.t, p, q return self.t, p, q
\ No newline at end of file
def force_field(self):
"""
Using force field method to compute trajector in phase space
"""
p = np.zeros((len(self.t), *self.v0.shape))
q = np.zeros((len(self.t), *self.r0.shape))
q[0] = self.r0
p[0] = self.m * self.v0
for i in range(1, len(self.t)):
# something with dt = self.t[i] - self.t[i-1]
# p = m*v
pass
return self.t, p, q
...@@ -14,7 +14,7 @@ import numpy as np ...@@ -14,7 +14,7 @@ import numpy as np
from jobs.src.integrators import verlet from jobs.src.integrators import verlet
from jobs.src.system import GravitationalSystem from jobs.src.system import GravitationalSystem
from jobs.src.bht_algorithm import MainApp, Particle from jobs.src.bht_algorithm_2D import MainApp, Particle
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
...@@ -59,11 +59,11 @@ def force(q: np.ndarray) -> np.ndarray: ...@@ -59,11 +59,11 @@ def force(q: np.ndarray) -> np.ndarray:
return np.concatenate((f_earth, f_moon), axis=0) return np.concatenate((f_earth, f_moon), axis=0)
class BarnesHutTest(unittest.TestCase): class IntegratorTest(unittest.TestCase):
def test_bht(self): def test_bht(self):
""" """
Test functionalities of velocity-Verlet algorithm using BHT tree Test functionalities of velocity-Verlet algorithm
""" """
# vector of r0 and v0 # vector of r0 and v0
x0 = np.array([ x0 = np.array([
...@@ -84,24 +84,21 @@ class BarnesHutTest(unittest.TestCase): ...@@ -84,24 +84,21 @@ class BarnesHutTest(unittest.TestCase):
force=force, force=force,
solver=verlet) solver=verlet)
t, p, q = system.direct_simulation() t, p, q = system.simulation()
## checking total energy conservation ## checking total energy conservation
H = np.linalg.norm(p[:,:2], axis=1)**2 / (2 * M_E) + np.linalg.norm(p[:,2:], axis=1)**2 / (2 * M_L) + \ H = np.linalg.norm(p[:,:2], axis=1)**2 / (2 * M_E) + np.linalg.norm(p[:,2:], axis=1)**2 / (2 * M_L) + \
-G * M_S * M_E / np.linalg.norm(q[:,:2], axis=1) - G * M_S * M_L / np.linalg.norm(q[:,2:], axis=1) + \ -G * M_S * M_E / np.linalg.norm(q[:,:2], axis=1) - G * M_S * M_L / np.linalg.norm(q[:,2:], axis=1) + \
-G * M_E * M_L / np.linalg.norm(q[:,2:] - q[:,:2], axis=1) -G * M_E * M_L / np.linalg.norm(q[:,2:] - q[:,:2], axis=1)
logger.info(f"{H=}")
self.assertTrue(np.greater(1e-7 + np.zeros(H.shape[0]), H - H[0]).all()) self.assertTrue(np.greater(1e-7 + np.zeros(H.shape[0]), H - H[0]).all())
## checking total linear momentum conservation ## checking total linear momentum conservation
P = p[:, :2] + p[:, 2:] P = p[:, :2] + p[:, 2:]
self.assertTrue(np.greater(1e-10 + np.zeros(P[0].shape), P - P[0]).all()) self.assertTrue(np.greater(1e-10 + np.zeros(P[0].shape), P - P[0]).all())
## checking total angular momentum conservation ## checking total angular momentum conservation
L = np.cross(q[:, :2], p[:, :2]) + np.cross(q[:, 2:], p[:, 2:]) L = np.cross(q[:, :2], p[:, :2]) + np.cross(q[:, 2:], p[:, 2:])
logger.info(f"{L=}")
self.assertTrue(np.greater(1e-8 + np.zeros(L.shape[0]), L - L[0]).all()) self.assertTrue(np.greater(1e-8 + np.zeros(L.shape[0]), L - L[0]).all())
## checking error ## checking error
...@@ -116,7 +113,7 @@ class BarnesHutTest(unittest.TestCase): ...@@ -116,7 +113,7 @@ class BarnesHutTest(unittest.TestCase):
force=force, force=force,
solver=verlet) solver=verlet)
t, p_t, q_t = system.direct_simulation() t, p_t, q_t = system.simulation()
H = np.linalg.norm(p_t[:,:2], axis=1)**2 / (2 * M_E) + np.linalg.norm(p_t[:,2:], axis=1)**2 / (2 * M_L) + \ H = np.linalg.norm(p_t[:,:2], axis=1)**2 / (2 * M_E) + np.linalg.norm(p_t[:,2:], axis=1)**2 / (2 * M_L) + \
...@@ -144,7 +141,7 @@ class BarnesHutTest(unittest.TestCase): ...@@ -144,7 +141,7 @@ class BarnesHutTest(unittest.TestCase):
force=force, force=force,
solver=verlet) solver=verlet)
t, p_reverse, q_reverse = system.direct_simulation() t, p_reverse, q_reverse = system.simulation()
self.assertTrue(np.greater(1e-10 + np.zeros(4), q_reverse[-1] - q[0]).all()) self.assertTrue(np.greater(1e-10 + np.zeros(4), q_reverse[-1] - q[0]).all())
self.assertTrue(np.greater(1e-10 + np.zeros(4), p_reverse[-1] - p[0]).all()) self.assertTrue(np.greater(1e-10 + np.zeros(4), p_reverse[-1] - p[0]).all())
......
...@@ -73,7 +73,7 @@ class IntegratorTest(unittest.TestCase): ...@@ -73,7 +73,7 @@ class IntegratorTest(unittest.TestCase):
force=force, force=force,
solver=verlet) solver=verlet)
t, p, q = system.direct_simulation() t, p, q = system.simulation()
## checking total energy conservation ## checking total energy conservation
H = np.linalg.norm(p[:,:2], axis=1)**2 / (2 * M_E) + np.linalg.norm(p[:,2:], axis=1)**2 / (2 * M_L) + \ H = np.linalg.norm(p[:,:2], axis=1)**2 / (2 * M_E) + np.linalg.norm(p[:,2:], axis=1)**2 / (2 * M_L) + \
...@@ -102,7 +102,7 @@ class IntegratorTest(unittest.TestCase): ...@@ -102,7 +102,7 @@ class IntegratorTest(unittest.TestCase):
force=force, force=force,
solver=verlet) solver=verlet)
t, p_t, q_t = system.direct_simulation() t, p_t, q_t = system.simulation()
H = np.linalg.norm(p_t[:,:2], axis=1)**2 / (2 * M_E) + np.linalg.norm(p_t[:,2:], axis=1)**2 / (2 * M_L) + \ H = np.linalg.norm(p_t[:,:2], axis=1)**2 / (2 * M_E) + np.linalg.norm(p_t[:,2:], axis=1)**2 / (2 * M_L) + \
...@@ -130,7 +130,7 @@ class IntegratorTest(unittest.TestCase): ...@@ -130,7 +130,7 @@ class IntegratorTest(unittest.TestCase):
force=force, force=force,
solver=verlet) solver=verlet)
t, p_reverse, q_reverse = system.direct_simulation() t, p_reverse, q_reverse = system.simulation()
self.assertTrue(np.greater(1e-10 + np.zeros(4), q_reverse[-1] - q[0]).all()) self.assertTrue(np.greater(1e-10 + np.zeros(4), q_reverse[-1] - q[0]).all())
self.assertTrue(np.greater(1e-10 + np.zeros(4), p_reverse[-1] - p[0]).all()) self.assertTrue(np.greater(1e-10 + np.zeros(4), p_reverse[-1] - p[0]).all())
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment