import requests
import csv
from TruckstopAuth import *
from TruckstopLibrary.Load import *
from TruckstopLibrary.DataMaps import *
from datetime import datetime
import dateutil
import threading
import sys
from queue import Queue
from enum import Enum
class RateInsightsEndpoints(Enum):
BookedEstimateEndpoint = '/modeledrate/v3/booked/rateestimate'
BookedTrendEndpoint = '/modeledrate/v3/booked/trend/fourweek'
Booked3yearEndpoint = '/modeledrate/v3/booked/history/threeyear'
PostedEstimateEndpoint = '/modeledrate/v3/posted/rateestimate'
PostedTrendEndpoint = '/modeledrate/v3/posted/trend/fourweek'
##### GLOBAL VARIABLES
global token # Token for API authentication, updated in various functions
token = None
# Static Variables for Truckstop API Credentials
truckstopCreds = TruckstopCredentials(
"[email protected]",
"MyPass123!",
"Q0FFQjVDNz...=="
)
# Base URL and API endpoint definitions
baseURL = "https://api.truckstop.com"
bookedEstimateEndpoint = '/modeledrate/v3/booked/rateestimate'
bookedTrendEndpoint = '/modeledrate/v3/booked/trend/fourweek'
booked3yearEndpoint = '/modeledrate/v3/booked/history/threeyear'
postedEstimateEndpoint = '/modeledrate/v3/posted/rateestimate'
postedTrendEndpoint = '/modeledrate/v3/posted/trend/fourweek'
def GetRateFromSingleLoad(token: str, load:Load, endpoint:RateInsightsEndpoints):
# Check if the token is valid; if not, refresh it
url = baseURL + endpoint.value # Construct the full API URL
headers = {
"accept": "application/json", # Accepting JSON response
"Authorization": "bearer " + token, # Bearer token for authentication
"content-type": "application/json" # Content type of the request
}
payload = {
"LoadId": load.loadNumber,
"RateDateTime": str(datetime.now()),
"BookedDateTime": str(datetime.now()),
"ShipDateTime": str(datetime.now()),
"DeliveryDateTime": str(datetime.now()),
"Origin": {
"Address":load.loadStops[0].location.streetAddress1,
"City": load.loadStops[0].location.city,
"StateCode": load.loadStops[0].location.state,
"ZipCode":load.loadStops[0].location.postalCode,
"Latitude": load.loadStops[0].location.latitude,
"Longitude":load.loadStops[0].location.longitude
},
"Destination": {
"Address":load.loadStops[1].location.streetAddress1,
"City": load.loadStops[1].location.city,
"StateCode": load.loadStops[1].location.state,
"ZipCode":load.loadStops[1].location.postalCode,
"Latitude": load.loadStops[1].location.latitude,
"Longitude": load.loadStops[1].location.longitude
},
"TransportationMode": TransportationMode.get_value(self=TransportationMode(),identifier=load.equipmentAttributes.transportationModeId, column="code"),
"EquipmentCode": TrailerType.get_value(self=TrailerType(), identifier=load.equipmentAttributes.equipmentTypeId,column= "code"),
"MultiPickDrop": "NO",
"CommodityId": load.commodityId,
"Weight": load.dimensional.weight,
}
response = requests.post(url, json=payload, headers=headers) # Send POST request
# Handle the API response
if response.status_code == 200:
responseData = json.loads(response.text)
return responseData, None # Return JSON response if successful
else:
return None, response.text
def GetRates(load, endpoint):
global token
# Check if the token is valid; if not, refresh it
if not token or dateutil.parser.parse(token.AccessTokenExpiration) < datetime.datetime.now(datetime.timezone.utc):
token = ManageTruckstopTokens(truckstopCreds) # Refresh token
url = baseURL + endpoint # Construct the full API URL
headers = {
"accept": "application/json", # Accepting JSON response
"Authorization": "bearer " + token.AccessToken, # Bearer token for authentication
"content-type": "application/json" # Content type of the request
}
payload = json.dumps(load.to_dict()) # Convert load object to JSON format
response = requests.post(url, data=payload, headers=headers) # Send POST request
# Handle the API response
if response.status_code == 200:
return response.json() # Return JSON response if successful
else:
print(f"Error: {response.text}") # Print error message if unsuccessful
return None
def ProcessCSVFile(fileName):
results = [] # List to hold processed results
queue = Queue() # Queue for managing workload among threads
# Worker function to process loads from the queue
def worker():
while True:
load, row = queue.get() # Get the next load and corresponding row
if load is None: # Sentinel value to exit the loop
queue.task_done() # Mark the task as done
break
api_result = GetRates(load, bookedEstimateEndpoint) # Call API for rates
if api_result and api_result.get("result") == 'Success':
# If successful, create a result row with API data
result_row = {
**row, # Original CSV row
"result": api_result.get("result"),
"loadId": api_result.get("loadId"),
"lowerRate": api_result.get("lowerRate"),
"predictedRate": api_result.get("predictedRate"),
"upperRate": api_result.get("upperRate"),
"message": api_result.get("message"),
"dataRecordsScore": api_result.get("dataRecordsScore"),
"loadInputScore": api_result.get("loadInputScore"),
"average": api_result.get("average"),
"ratePerMile": api_result.get("ratePerMile")
}
results.append(result_row) # Append successful result
else:
# If API call failed, create a result row with error information
result_row = {
**row, # Original CSV row
"result": None,
"loadId": None,
"lowerRate": None,
"predictedRate": None,
"upperRate": None,
"message": 'Error getting rates for this lane, please check input data.',
"dataRecordsScore": None,
"loadInputScore": None,
"average": None,
"ratePerMile": None
}
results.append(result_row) # Append error result
queue.task_done() # Mark the task as done
num_threads = 5 # Number of threads to use for processing
threads = [] # List to keep track of thread objects
# Start worker threads
for _ in range(num_threads):
thread = threading.Thread(target=worker)
thread.start()
threads.append(thread)
try:
with open(fileName, 'r', newline='') as csvFile:
reader = csv.DictReader(csvFile, delimiter=',') # Read CSV file
for row in reader:
# Create a Load object from the CSV row
load = Load(
loadID=row['LoadID'],
rateDateTime=row['RateDateTime'],
bookedDateTime=row['BookedDateTime'],
shipDateTime=row['ShipDateTime'],
origin=Location(
address=row['OriginAddress'],
city=row['OriginCity'],
stateCode=row['OriginState'],
zipCode=row['OriginZip'],
latitude=row['OriginLatitude'],
longitude=row['OriginLongitude']
),
destination=Location(
address=row['DestinationAddress'],
city=row['DestinationCity'],
stateCode=row['DestinationState'],
zipCode=row['DestinationZip'],
latitude=row['DestinationLatitude'],
longitude=row['DestinationLongitude']
),
transportationMode=row['TransportationMode'],
stops=row['Stops'],
mileage=row['Mileage'],
postedDateTime=row['PostedDateTime'],
entryDateTime=row["EntryDateTime"],
pickupDateTime=row['PickupDateTime'],
deliveryDateTime=row['DeliveryDateTime'],
equipmentCode=row['EquipmentCode'],
equipmentOptions=row['EquipmentOptions'],
dimensional=Dimensional(
weight=row['Weight'],
length=row['Length'],
height=row['Height'],
width=row['Width'],
pallecCount=row['PalletCount'],
pieceCount=row['PieceCount'],
cube=row['Cube'],
),
commodityID=row['CommodityID'],
specInfo=row['SpecInfo'],
otherEquipmentNeeds=row['OtherEQNeeds'],
fuelPricePerGallon=row['FuelPricePerGallon'],
rate=Rate(
currencyCode=row['RateCurrencyCode'],
amount=row['RateAmount'],
type=row["RateType"]
),
fuel=Rate(
currencyCode=row['FuelCurrencyCode'],
amount=row['FuelAmount'],
type=row['FuelType']
)
)
queue.put((load, row)) # Add load and row to the queue for processing
finally:
# Stop the worker threads by putting sentinel values
for _ in threads:
queue.put((None, None)) # Sentinel value for each thread
# Wait for all tasks to be done
queue.join()
# Wait for all threads to finish
for thread in threads:
thread.join() # Ensure threads complete
# Overwrite the original file with results
with open(fileName, 'w', newline='') as csvFile:
fieldnames = list(results[0].keys()) # Use the keys of the first row as fieldnames
writer = csv.DictWriter(csvFile, fieldnames=fieldnames)
writer.writeheader() # Write header row to CSV
for result in results:
writer.writerow(result) # Write each result row to CSV
print("File processed successfully.") # Indicate completion
if __name__ == "__main__":
# Main execution block
if len(sys.argv) > 1:
csvFile = sys.argv[1] # Get the CSV file name from command-line arguments
token = ManageTruckstopTokens(truckstopCreds) # Initialize token
if csvFile:
ProcessCSVFile(csvFile) # Process the provided CSV file
else:
print("\nPlease provide a valid CSV file. Ex. 'Python RateInsights.py file.csv'\n")
else:
print("\nPlease provide a valid CSV file. Ex. 'Python RateInsights.py file.csv'\n")