RateInsights.py

import requests
import csv
from TruckstopAuth import *
from TruckstopLibrary.Load import *
from TruckstopLibrary.DataMaps import *
from datetime import datetime
import dateutil
import threading
import sys
from queue import Queue
from enum import Enum

class RateInsightsEndpoints(Enum):
    BookedEstimateEndpoint = '/modeledrate/v3/booked/rateestimate'
    BookedTrendEndpoint = '/modeledrate/v3/booked/trend/fourweek'
    Booked3yearEndpoint = '/modeledrate/v3/booked/history/threeyear'
    PostedEstimateEndpoint = '/modeledrate/v3/posted/rateestimate'
    PostedTrendEndpoint = '/modeledrate/v3/posted/trend/fourweek'


##### GLOBAL VARIABLES
global token  # Token for API authentication, updated in various functions
token = None

# Static Variables for Truckstop API Credentials
truckstopCreds = TruckstopCredentials(
    "[email protected]",
    "MyPass123!",
    "Q0FFQjVDNz...=="
)

# Base URL and API endpoint definitions
baseURL = "https://api.truckstop.com"
bookedEstimateEndpoint = '/modeledrate/v3/booked/rateestimate'
bookedTrendEndpoint = '/modeledrate/v3/booked/trend/fourweek'
booked3yearEndpoint = '/modeledrate/v3/booked/history/threeyear'

postedEstimateEndpoint = '/modeledrate/v3/posted/rateestimate'
postedTrendEndpoint = '/modeledrate/v3/posted/trend/fourweek'

def GetRateFromSingleLoad(token: str, load:Load, endpoint:RateInsightsEndpoints):    
    # Check if the token is valid; if not, refresh it
    url = baseURL + endpoint.value  # Construct the full API URL
    headers = {
        "accept": "application/json",  # Accepting JSON response
        "Authorization": "bearer " + token,  # Bearer token for authentication
        "content-type": "application/json"  # Content type of the request
    }

    payload = {
        "LoadId": load.loadNumber,
        "RateDateTime": str(datetime.now()),
        "BookedDateTime": str(datetime.now()),
        "ShipDateTime": str(datetime.now()),
        "DeliveryDateTime": str(datetime.now()),
        "Origin": {
            "Address":load.loadStops[0].location.streetAddress1,
            "City": load.loadStops[0].location.city,
            "StateCode": load.loadStops[0].location.state,
            "ZipCode":load.loadStops[0].location.postalCode,
            "Latitude": load.loadStops[0].location.latitude,
            "Longitude":load.loadStops[0].location.longitude
        },
        "Destination": {
            "Address":load.loadStops[1].location.streetAddress1,
            "City": load.loadStops[1].location.city,
            "StateCode": load.loadStops[1].location.state,
            "ZipCode":load.loadStops[1].location.postalCode,
            "Latitude": load.loadStops[1].location.latitude,
            "Longitude": load.loadStops[1].location.longitude
        },
        "TransportationMode": TransportationMode.get_value(self=TransportationMode(),identifier=load.equipmentAttributes.transportationModeId, column="code"),
        "EquipmentCode": TrailerType.get_value(self=TrailerType(), identifier=load.equipmentAttributes.equipmentTypeId,column= "code"),
        "MultiPickDrop": "NO",
        "CommodityId": load.commodityId,
        "Weight": load.dimensional.weight,
    }
    response = requests.post(url, json=payload, headers=headers)  # Send POST request

    # Handle the API response
    if response.status_code == 200:
        responseData = json.loads(response.text)
        return responseData, None  # Return JSON response if successful
    else:
        return None, response.text

def GetRates(load, endpoint):
    global token
    
    # Check if the token is valid; if not, refresh it
    if not token or dateutil.parser.parse(token.AccessTokenExpiration) < datetime.datetime.now(datetime.timezone.utc):
        token = ManageTruckstopTokens(truckstopCreds)  # Refresh token

    url = baseURL + endpoint  # Construct the full API URL
    headers = {
        "accept": "application/json",  # Accepting JSON response
        "Authorization": "bearer " + token.AccessToken,  # Bearer token for authentication
        "content-type": "application/json"  # Content type of the request
    }
    payload = json.dumps(load.to_dict())  # Convert load object to JSON format
    response = requests.post(url, data=payload, headers=headers)  # Send POST request

    # Handle the API response
    if response.status_code == 200:
        return response.json()  # Return JSON response if successful
    else:
        print(f"Error: {response.text}")  # Print error message if unsuccessful
        return None

def ProcessCSVFile(fileName):
    results = []  # List to hold processed results
    queue = Queue()  # Queue for managing workload among threads

    # Worker function to process loads from the queue
    def worker():
        while True:
            load, row = queue.get()  # Get the next load and corresponding row
            if load is None:  # Sentinel value to exit the loop
                queue.task_done()  # Mark the task as done
                break
            api_result = GetRates(load, bookedEstimateEndpoint)  # Call API for rates
            if api_result and api_result.get("result") == 'Success':
                # If successful, create a result row with API data
                result_row = {
                    **row,  # Original CSV row
                    "result": api_result.get("result"),
                    "loadId": api_result.get("loadId"),
                    "lowerRate": api_result.get("lowerRate"),
                    "predictedRate": api_result.get("predictedRate"),
                    "upperRate": api_result.get("upperRate"),
                    "message": api_result.get("message"),
                    "dataRecordsScore": api_result.get("dataRecordsScore"),
                    "loadInputScore": api_result.get("loadInputScore"),
                    "average": api_result.get("average"),
                    "ratePerMile": api_result.get("ratePerMile")
                }
                results.append(result_row)  # Append successful result
            else:
                # If API call failed, create a result row with error information
                result_row = {
                    **row,  # Original CSV row
                    "result": None,
                    "loadId": None,
                    "lowerRate": None,
                    "predictedRate": None,
                    "upperRate": None,
                    "message": 'Error getting rates for this lane, please check input data.',
                    "dataRecordsScore": None,
                    "loadInputScore": None,
                    "average": None,
                    "ratePerMile": None
                }
                results.append(result_row)  # Append error result
            queue.task_done()  # Mark the task as done

    num_threads = 5  # Number of threads to use for processing
    threads = []  # List to keep track of thread objects
    
    # Start worker threads
    for _ in range(num_threads):
        thread = threading.Thread(target=worker)
        thread.start()
        threads.append(thread)

    try:
        with open(fileName, 'r', newline='') as csvFile:
            reader = csv.DictReader(csvFile, delimiter=',')  # Read CSV file
            for row in reader:
                # Create a Load object from the CSV row
                load = Load(
                    loadID=row['LoadID'],
                    rateDateTime=row['RateDateTime'],
                    bookedDateTime=row['BookedDateTime'],
                    shipDateTime=row['ShipDateTime'],
                    origin=Location(
                        address=row['OriginAddress'],
                        city=row['OriginCity'],
                        stateCode=row['OriginState'],
                        zipCode=row['OriginZip'],
                        latitude=row['OriginLatitude'],
                        longitude=row['OriginLongitude']
                    ),
                    destination=Location(
                        address=row['DestinationAddress'],
                        city=row['DestinationCity'],
                        stateCode=row['DestinationState'],
                        zipCode=row['DestinationZip'],
                        latitude=row['DestinationLatitude'],
                        longitude=row['DestinationLongitude']
                    ),
                    transportationMode=row['TransportationMode'],
                    stops=row['Stops'],
                    mileage=row['Mileage'],
                    postedDateTime=row['PostedDateTime'],
                    entryDateTime=row["EntryDateTime"],
                    pickupDateTime=row['PickupDateTime'],
                    deliveryDateTime=row['DeliveryDateTime'],
                    equipmentCode=row['EquipmentCode'],
                    equipmentOptions=row['EquipmentOptions'],
                    dimensional=Dimensional(
                        weight=row['Weight'],
                        length=row['Length'],
                        height=row['Height'],
                        width=row['Width'],
                        pallecCount=row['PalletCount'],
                        pieceCount=row['PieceCount'],
                        cube=row['Cube'],
                    ),
                    commodityID=row['CommodityID'],
                    specInfo=row['SpecInfo'],
                    otherEquipmentNeeds=row['OtherEQNeeds'],
                    fuelPricePerGallon=row['FuelPricePerGallon'],
                    rate=Rate(
                        currencyCode=row['RateCurrencyCode'],
                        amount=row['RateAmount'],
                        type=row["RateType"]
                    ),
                    fuel=Rate(
                        currencyCode=row['FuelCurrencyCode'],
                        amount=row['FuelAmount'],
                        type=row['FuelType']
                    )
                )
                queue.put((load, row))  # Add load and row to the queue for processing
                
    finally:
        # Stop the worker threads by putting sentinel values
        for _ in threads:
            queue.put((None, None))  # Sentinel value for each thread

        # Wait for all tasks to be done
        queue.join()

        # Wait for all threads to finish
        for thread in threads:
            thread.join()  # Ensure threads complete

    # Overwrite the original file with results
    with open(fileName, 'w', newline='') as csvFile:
        fieldnames = list(results[0].keys())  # Use the keys of the first row as fieldnames
        writer = csv.DictWriter(csvFile, fieldnames=fieldnames)
        writer.writeheader()  # Write header row to CSV
        for result in results:
            writer.writerow(result)  # Write each result row to CSV

    print("File processed successfully.")  # Indicate completion

if __name__ == "__main__":        
    # Main execution block
    if len(sys.argv) > 1:
        csvFile = sys.argv[1]  # Get the CSV file name from command-line arguments
        token = ManageTruckstopTokens(truckstopCreds)  # Initialize token
        
        if csvFile:
            ProcessCSVFile(csvFile)  # Process the provided CSV file
        else:
            print("\nPlease provide a valid CSV file.  Ex. 'Python RateInsights.py file.csv'\n")
    else:
        print("\nPlease provide a valid CSV file.  Ex. 'Python RateInsights.py file.csv'\n")