RiskFactor.py

import json
import requests
from TruckstopAuth import ManageTruckstopTokens
from TruckstopLibrary.Auth import TruckstopCredentials
from TruckstopLibrary.RiskFactor import RiskFactorReport

truckstopUserProd = TruckstopCredentials(
    "[email protected]",
    "wH4oi#8mx5LInd",
    "Q0FFQjVDNzEtNzNGQS00MTA5LTlGQ0ItMjIzQTcxM0M1ODMwOjU5MERCOEI5LTczQjAtNDdCRC1BOEZFLTdCMzIwMTBBNjA2QQ=="
)

baseURL = "https://api.truckstop.com/riskfactoranalysis/v1/carriers/"

def GetReportByIdentifier(accessToken:str, identifier:str):
    url = baseURL + str(identifier)
    headers = {
        "Authorization": "bearer " + accessToken,
        "Accept": "application/json",
    }
    response = requests.get(url=url, headers=headers)
    if (response.status_code == 200):
        try:
            report = RiskFactorReport.from_dict(json.loads(response.text))
            return report, None
        except Exception as e:
            return None, str(e)
    else:
        errors = json.loads(response.text)
        errors = errors.get("errors")
        return None, errors

def SearchForCarrier(accessToken:str, email:str = None, phoneNumber:str = None, vin:str = None):
    params = []
    if email is not None:
        params.append(f"email={email}")
    if phoneNumber is not None:
        params.append(f"phone={phoneNumber}")
    if vin is not None:
        params.append(f"vin={vin}")
    url = baseURL + "/search?" + "&".join(params)
    headers = {
        "Authorization": "bearer " + accessToken,
        "Accept": "application/json"
    }
    response = requests.get(url=url, headers=headers)
    if (response.status_code==200):
        dotNumbers = json.loads(response.text).get("dotNumbers")
        return dotNumbers, None
    else:
        errorText = json.loads(response.text)
        errorText = errorText.get("errors")
        return None, errorText

def GetBulkReports(accessToken:str, identifiers:list[str]):
    url = baseURL
    headers = {
        "Authorization": "bearer " + accessToken,
        "Accept": "application/json",
    }
    payload = {
        "carrierIds": identifiers
    }
    response = requests.post(url=url, headers=headers, json=payload)
    if (response.status_code == 200):
        response = json.loads(response.text)
        print(f"Found {response.get("resultCount").get("knownCarrierIdCount")} of {response.get("resultCount").get("queriedCarrierIdCount")}")    
        print(f"Carriers IDs not found: " + ', '.join(response.get("resultCount").get("unknownCarrierIds")))
        reports = response.get("reports")
        reportList = []
        errorList = []
        for report in reports: 
            try:
                report = RiskFactorReport.from_dict(report)
                reportList.append(report)
            except Exception as e:
                errorList.append(str(e))
        return reportList, errorList
    else:
        errorText = json.loads(response.text)
        errorText = errorText.get("errors")
        return None, errorText

def GetCarrierRMISCertificationStatus(accessToken:str, identifier:str):
    url = baseURL + str(identifier) + "/certification"
    headers = {
        "Authorization": "bearer " + accessToken,
        "Accept": "application/json",
    }
    response = requests.get(url=url, headers=headers)
    if (response.status_code == 200):
        response = json.loads(response.text)
        return response, None
    else:        
        errorText = json.loads(response.text)
        errorText = errorText.get("errors")
        return None, errorText


##################
## Sample usage ##
##################
token = ManageTruckstopTokens(truckstopCreds=truckstopUserProd, isSandbox=False).AccessToken

#TEST 1
print("TEST #1 - Get Single Report")
riskFactorReport, error = GetReportByIdentifier(accessToken=token, identifier=2836509)
if (riskFactorReport is not None):
    print (riskFactorReport.riskVectors[0].riskSubVectors[0].subCategory)
else:
    for k,v in error.items():
        for value in v:
            print (f'{value}')
    

#TEST 2
print ("\n\nTEST #2 - Search for a carrier")
dotList, error = SearchForCarrier(accessToken=token, email="[email protected]")
if dotList is not None:
    if len(dotList) > 0:
        for dot in dotList:
            print(dot)
    else:
        print("No results found")
else:
    for k,v in error.items():
        for value in v:
            print (f'{value}')

#Test 3
print ("\n\nTest #3 - Get Bulk Reports")
reports, errors = GetBulkReports(accessToken=token, identifiers=[1,2836509,"3", "8"])
for report in reports:
    print ("\n" + report.reportSummary)
for error in errors:
    for k,v in error.items():
        for value in v:
            print (f'{value}')


#Test 4
print ("\n\nTest #4 - Get Carrier Cert status")
status, error = GetCarrierRMISCertificationStatus(accessToken=token, identifier=2836509)
if status is not None:
    print (f'Certified: {status.get("certified")}')
    print (f'Date (un)Certified: {status.get("dateCertified")}')
else:
    for k,v in error.items():
        for value in v:
            print (f'{value}')