Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate simplified version of fuzzy-based matchmaking algorithm #12

Open
pauldg opened this issue Aug 2, 2024 · 8 comments
Open

Integrate simplified version of fuzzy-based matchmaking algorithm #12

pauldg opened this issue Aug 2, 2024 · 8 comments

Comments

@pauldg
Copy link
Collaborator

pauldg commented Aug 2, 2024

Need @abdulrahmanazab to help written/graphical version of the algorithm using the stats we have which we can then integrate in the code base.

@abdulrahmanazab
Copy link
Contributor

Universal Assumption: all destinations in question are getting jobs only from Galaxy

First, we have a list of destinations that actually "can" take the job, i.e. for each of these destinations: Match(j.requirements,d.specifications) > 0.

what we need is an ordered list of these destinations based on the degree of matching, i.e. which d is the best for j

Second, we have a simple logic that is based on: queue size (how many jobs are currently queued to d) and the distance (latitude and longitude based distance calculation between the destination and the job input dataset location)

Now we want to improve this logic as we have new information available, about destination (d) and jobs of a specific tool (t):

  • Static information:
    • Existing-resources(d): static information on the CPU cores and Memory on d
  • Dynamic information:
    • Median-waiting-time(t,d): The median of all waiting times of all jobs of type (t) when they were submitted to destination (d) in the past
    • Median-run-time(t,d): The median of all execution times of all jobs of type (t) when they were submitted to destination (d) and were successfully executed in the past
  • Realtime-info(d):
    • Queue-size(d): Current total queue size of the destination
    • Running-jobs(d): The number current running jobs
    • [To be collected] Currently-consumed-cores(d):
      • Cores = 0
      • For each running job (j): Cores += Cores(j)
    • [To be collected] Currently-consumed-memory(d):
      • Memory = 0
      • For each running job (j): Memory += Memory(j) "The required memory by j, as it can be difficult to measure exactly how much memory is j using right now"

Based on all the above, the new logic will be composed

@abdulrahmanazab
Copy link
Contributor

abdulrahmanazab commented Sep 11, 2024

Scheduling algorithm logic:
Given:

  • Job requirements: # CPU-Cores, # Memory GiB, (other static requirements)
  • List of matching end-points, and for each:
    • Distance from the Data repo
    • Static and dynamic information (described above)

Algorithm logic:

  • For each Job (J):
    • Order the Endpoints list ascending by the closest to the data repo
    • Opt out the Endpoints with large estimated data transfer overhead
    • For each Endpoint (E):
      • If the currently consumed-cores and consumed-memory (E) NOT Match (J), remove E, Next
      • Queue Matching factor (qm(E)): 1/[Median-waiting-time(t,E)*Queue-size(E)]
      • Compute Matching factor (cm(E)): 1/[Median-run-time(t,E)*Running-jobs(E)]
      • Matching(J,E) = [qm(E) + cm(E)]
  • Order the Endpoints list Descending by the best match

@sanjaysrikakulam
Copy link
Member

@pauldg and I were discussing:

  1. In the above algorithm, we are not taking into account the input dataset size (also, the median input data set size)
  2. Handle cases where there is no median queue/run time because the destination could be new or the tool was never used/run and how do we weigh/rank in such cases.

@sanjaysrikakulam
Copy link
Member

sanjaysrikakulam commented Nov 7, 2024

A temp implementation of the algorithm:

Input:

  1. List of candidate destinations
  2. For each destination: (list(dicts))
    1. Total number of CPU cores and Memory available
    2. Median waiting time of the current tool in the queue
    3. Median running time of the current tool
    4. Current queue size of the destination
    5. Current number of jobs running on the destination
    6. Current number of unclaimed cores on the destination
    7. Current number of unclaimed memory on the destination
    8. Distance between the input data location and the destination
  3. Job requirements (dict)
    1. Number of CPU cores required
    2. Amount of memory required
def calculate_matching_score(destination: dict) -> float:
    """
    Calculate the matching score between a job and a destination.
    """
    median_waiting_time = destination.get('median_waiting_time', None)
    queue_size = destination.get('queue_size', 1)
    median_running_time = destination.get('median_running_time', None)
    running_jobs = destination.get('running_jobs', 1)

    # Queue matching factor (qm).
    if median_waiting_time > 0 and queue_size > 0:
        qm = 1 / (median_waiting_time * queue_size)
    else:
        qm = float('inf')

    # Compute matching factor (cm).
    if median_running_time > 0 and running_jobs > 0:
        cm = 1 / (median_running_time * running_jobs)
    else:
        cm = float('inf')

    # Final matching score
    return qm + cm


def select_best_destination(job_requirements: dict, destinations: list) -> list:
    """
    Selects the best destination for a job based on job requirements and destination metrics.
    """
    cpu_required = job_requirements['cpu_cores']
    memory_required = job_requirements['memory']

    # Filter out destinations that can't meet basic requirements based on the "real-time" data
    viable_destinations = []
    for dest in destinations:
        if dest['unclaimed_cores'] > cpu_required and dest['unclaimed_memory'] > memory_required:
            viable_destinations.append(dest)

    # Sort by distance to input data location (ascending)
    viable_destinations.sort(key=lambda x: x.get('distance_to_data', float('inf')))

    # Calculate matching scores for each viable destination
    for dest in viable_destinations:
        dest['matching_score'] = calculate_matching_score(dest)

    # Sort by matching score (descending)
    viable_destinations.sort(key=lambda x: x['matching_score'], reverse=True)

    return viable_destinations

@abdulrahmanazab
Copy link
Contributor

Ok, as we are now able to collect this information:

  • Current number of unclaimed cores on the destination
  • Current number of unclaimed memory on the destination
  • Distance between the input data location and the destination

This is excellent :) @sanjaysrikakulam your code is very good as a start. Now I think we should go with the usage of the three above parameter values as historical information, e.g. taking the collective historical # free cpus, # free memory, etc. from the DB and feed that into the Fuzzy input membership function. We can start with three Fuzzy membership functions: CPU, Memory, and Distance

@sanjaysrikakulam
Copy link
Member

Ok, as we are now able to collect this information:

* Current number of unclaimed cores on the destination

* Current number of unclaimed memory on the destination

* Distance between the input data location and the destination

This is excellent :) @sanjaysrikakulam your code is very good as a start. Now I think we should go with the usage of the three above parameter values as historical information, e.g. taking the collective historical # free cpus, # free memory, etc. from the DB and feed that into the Fuzzy input membership function. We can start with three Fuzzy membership functions: CPU, Memory, and Distance

The current version here already uses the unclaimed CPU, memory, and distance (but I'm not sure what you mean by "as historical information").

here is the updated code:

def calculate_matching_score(destination: dict) -> float:
    """
    Calculate the matching score between a job and a destination
    """
    median_waiting_time = destination.get('dest_tool_median_queue_time', None)
    queue_size = destination.get('dest_queue_count', 1)
    median_running_time = destination.get('dest_tool_median_run_time', None)
    running_jobs = destination.get('dest_run_count', 1)

    # Queue matching factor (qm).
    if median_waiting_time > 0 and queue_size > 0:
        qm = 1 / (median_waiting_time * queue_size)
    else:
        qm = float('inf')

    # Compute matching factor (cm).
    if median_running_time > 0 and running_jobs > 0:
        cm = 1 / (median_running_time * running_jobs)
    else:
        cm = float('inf')

    # Final matching score
    return qm + cm


def get_sorted_destinations(job_requirements: dict, destinations: list, objectstores: dict, dataset_attributes: dict) -> list:
    """
    Sorts the destinations based on the matching score and distance to the input data location.
    """
    sorted_destinations = []
    cpu_required = job_requirements['cpu_cores']
    memory_required = job_requirements['memory']

    # Filter out destinations that can't meet basic requirements based on the "real-time" data
    viable_destinations = []
    for dest in destinations:
        # Check if the destination_status is 'online'
        if dest['dest_status'] == 'online':
            # Check if the destination has enough resources
            if dest['dest_unconsumed_cpu'] > cpu_required and dest['dest_unconsumed_mem'] > memory_required:
                # Calculate the distance to the input data location
                dest['distance_to_data'] = closest_destination(dest, objectstores, dataset_attributes)
                viable_destinations.append(dest)

    # Fallback case if no viable destinations are found (e.g. no destination has enough resources)
    if not viable_destinations:
        for dest in destinations:
            dest['distance_to_data'] = closest_destination(dest, objectstores, dataset_attributes)
        sorted_destinations = sorted(destinations, key=lambda x: x['distance_to_data'])
        return [dest['destination_id'] for dest in sorted_destinations]

    # Sort by distance to input data location (ascending)
    viable_destinations.sort(key=lambda x: x['distance_to_data'])

    # Calculate matching scores for each viable destination
    for dest in viable_destinations:
        dest['matching_score'] = calculate_matching_score(dest)

    # Sort by matching score (descending)
    viable_destinations.sort(key=lambda x: x['matching_score'], reverse=True)

    sorted_destinations = [dest['destination_id'] for dest in viable_destinations]
    return sorted_destinations

@abdulrahmanazab
Copy link
Contributor

abdulrahmanazab commented Jan 8, 2025

In the context of meta-scheduling for Galaxy, it is crucial to collect histograms of available resources from the compute clusters (endpoints), rather than a collective or aggregate figure representing free CPUs and memory. This is because the resources in a cluster are spread across multiple machines, and each machine may have varying numbers of free CPUs and memory.

To capture this diversity accurately, a histogram approach should be used. Specifically, the histogram would report the number of machines in the cluster that have X number of free CPUs and X amount of free memory. This allows for a more detailed and accurate picture of the available resources in each cluster.

The histogram-like data would resemble the fuzzification process used in the fuzzy matchmaking algorithm described in the DAIS2014 paper. In that paper, fuzzy sets are used to categorize resource availability in degrees, such as "low," "medium," and "high," allowing for flexible matching between job requirements and available resources. By using histograms for resource availability, we can apply similar fuzzy logic techniques to match jobs with compute clusters more precisely, taking into account the full spectrum of resource availability on each machine, rather than a simple total number.
DAIS2014.pdf

@abdulrahmanazab
Copy link
Contributor

abdulrahmanazab commented Jan 8, 2025

Can be something like this

def calculate_matching_score(destination: dict) -> float:
    """
    Calculate the matching score between a job and a destination
    using a fuzzy-based matchmaking approach.
    """
    median_waiting_time = destination.get('dest_tool_median_queue_time', None)
    queue_size = destination.get('dest_queue_count', 1)
    median_running_time = destination.get('dest_tool_median_run_time', None)
    running_jobs = destination.get('dest_run_count', 1)

    # Queue matching factor (qm).
    if median_waiting_time > 0 and queue_size > 0:
        qm = 1 / (median_waiting_time * queue_size)
    else:
        qm = float('inf')

    # Compute matching factor (cm).
    if median_running_time > 0 and running_jobs > 0:
        cm = 1 / (median_running_time * running_jobs)
    else:
        cm = float('inf')

    # Final matching score
    return qm + cm

def collect_resource_histogram(destination: dict) -> dict:
    """
    Collect a histogram of available resources (CPU cores and memory)
    across multiple machines in the destination cluster.
    This histogram shows how many machines have X number of free CPUs
    and X amount of free memory, allowing for more accurate matching.
    """
    cpu_histogram = {}
    memory_histogram = {}
    
    # Collecting histograms for CPUs and memory
    for machine in destination.get('dest_machines', []):  # assuming 'dest_machines' contains the nodes
        free_cpus = machine.get('free_cpus', 0)
        free_memory = machine.get('free_memory', 0)
        
        # Update the CPU histogram
        if free_cpus in cpu_histogram:
            cpu_histogram[free_cpus] += 1
        else:
            cpu_histogram[free_cpus] = 1
        
        # Update the memory histogram
        if free_memory in memory_histogram:
            memory_histogram[free_memory] += 1
        else:
            memory_histogram[free_memory] = 1
    
    # Return the histograms for CPUs and memory
    return {'cpu_histogram': cpu_histogram, 'memory_histogram': memory_histogram}

def get_sorted_destinations(job_requirements: dict, destinations: list, objectstores: dict, dataset_attributes: dict) -> list:
    """
    Sorts the destinations based on the matching score and distance to the input data location.
    The sorting considers a histogram of free resources (CPU and memory) rather than aggregated values.
    """
    sorted_destinations = []
    cpu_required = job_requirements['cpu_cores']
    memory_required = job_requirements['memory']

    # Filter out destinations that can't meet basic requirements based on the "real-time" data
    viable_destinations = []
    for dest in destinations:
        # Check if the destination_status is 'online'
        if dest['dest_status'] == 'online':
            # Collect resource histograms for the destination
            histograms = collect_resource_histogram(dest)
            cpu_histogram = histograms['cpu_histogram']
            memory_histogram = histograms['memory_histogram']
            
            # Check if any machine in the cluster has enough free CPUs and memory
            has_sufficient_resources = False
            for free_cpus, count in cpu_histogram.items():
                if free_cpus >= cpu_required:
                    has_sufficient_resources = True
                    break
            
            for free_memory, count in memory_histogram.items():
                if free_memory >= memory_required:
                    has_sufficient_resources = True
                    break
            
            # Only consider this destination if it has enough resources
            if has_sufficient_resources:
                # Calculate the distance to the input data location
                dest['distance_to_data'] = closest_destination(dest, objectstores, dataset_attributes)
                viable_destinations.append(dest)

    # Fallback case if no viable destinations are found
    if not viable_destinations:
        for dest in destinations:
            dest['distance_to_data'] = closest_destination(dest, objectstores, dataset_attributes)
        sorted_destinations = sorted(destinations, key=lambda x: x['distance_to_data'])
        return [dest['destination_id'] for dest in sorted_destinations]

    # Sort by distance to input data location (ascending)
    viable_destinations.sort(key=lambda x: x['distance_to_data'])

    # Calculate matching scores for each viable destination
    for dest in viable_destinations:
        dest['matching_score'] = calculate_matching_score(dest)

    # Sort by matching score (descending)
    viable_destinations.sort(key=lambda x: x['matching_score'], reverse=True)

    sorted_destinations = [dest['destination_id'] for dest in viable_destinations]
    return sorted_destinations

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

When branches are created from issues, their pull requests are automatically linked.

3 participants