TruckstopAuth.py

import requests
import sqlite3
import json
import datetime
import dateutil

# Global variables for database connection and Truckstop credentials
dbo = None
_truckstopCreds = None

class DBO:
    def __init__(self, sqlConn, sqlCursor):
        self.sqlConn = sqlConn
        self.sqlCursor = sqlCursor

class TruckstopCredentials:
    def __init__(self, Username, Password, Base64):
        self.UserName = Username
        self.Password = Password
        self.Base64 = Base64

class Token:
    def __init__(self, TruckstopUserId, TruckstopUserName, AccessToken, RefreshToken, AccessTokenExpiration, RefreshTokenExpiration):
        self.TruckstopUserId = TruckstopUserId
        self.TruckstopUserName = TruckstopUserName
        self.AccessToken = AccessToken
        self.RefreshToken = RefreshToken
        self.AccessTokenExpiration = AccessTokenExpiration
        self.RefreshTokenExpiration = RefreshTokenExpiration

def __SetupDB():
    # Initializes SQLite database and creates a UserTokens table if it doesn't exist
    sqlConn = sqlite3.connect('./UserTokens.sqlite')  # Connect to SQLite database
    sqlCursor = sqlConn.cursor()
    sqlCursor.execute('CREATE TABLE IF NOT EXISTS UserTokens (ID INTEGER PRIMARY KEY, TruckstopUserId varchar(255), TruckstopUserName varchar(255), AccessToken varchar(255), RefreshToken varchar(255), AccessTokenExpiration varchar(255), RefreshTokenExpiration varchar(255))')
    return DBO(sqlConn, sqlCursor)  # Return database object

def __RequestNewTokens(isSandbox=False):
    # Requests new access and refresh tokens using user credentials
    if not isSandbox:
        url = "https://api.truckstop.com/auth/token?scope=truckstop"
    else:
        url = "https://api-int.truckstop.com/auth/token?scope=truckstop"
    payload = {
        "grant_Type": "password",  # Grant type for password flow
        "username": _truckstopCreds.UserName,
        "password": _truckstopCreds.Password
    }
    headers = {
        "accept": "application/json",
        "Authorization": "basic " + _truckstopCreds.Base64,
        "content-type": "application/x-www-form-urlencoded"
    }
    response = requests.post(url, data=payload, headers=headers)  # Send POST request
    return response  # Return the API response

def __RefreshTokens(refreshToken, isSandbox=False):
    # Requests new tokens using an existing refresh token
    if not isSandbox:
        url = "https://api.truckstop.com/auth/token?scope=truckstop"
    else:
        url = "https://api-int.truckstop.com/auth/token?scope=truckstop"
    payload = {
        "grant_Type": "refresh_token",  # Grant type for refresh token flow
        "refresh_token": refreshToken
    }
    headers = {
        "accept": "application/json",
        "Authorization": "basic " + _truckstopCreds.Base64,
        "content-type": "application/x-www-form-urlencoded"
    }
    response = requests.post(url, data=payload, headers=headers)  # Send POST request
    return response  # Return the API response

def __ParseResponse(response):
    # Parses the API response to extract token information
    if (response.status_code == 200):  # Check for successful response
        response = response.text        
        response = json.loads(response)  # Parse JSON response
        claims = json.loads(response["claims"])  # Extract claims
        current_time = datetime.datetime.now(datetime.timezone.utc)  # Get current UTC time

        # Extract token details
        AccessToken = response["access_token"]
        RefreshToken = response["refresh_token"]
        TruckstopUserId = claims[1]["Value"]
        
        # Calculate token expiration times
        AccessTokenExpiration = current_time + datetime.timedelta(seconds=1169)
        RefreshTokenExpiration = current_time + dateutil.relativedelta.relativedelta(months=6)
        
        # Return a Token object with the extracted information
        return Token(TruckstopUserId, claims[8]["Value"], AccessToken, RefreshToken, AccessTokenExpiration.isoformat(), RefreshTokenExpiration.isoformat())

def __ManageDBTokens(token):
    # Updates or inserts token information into the SQLite database
    storedTokens = dbo.sqlCursor.execute('SELECT * FROM UserTokens WHERE TruckstopUserName = \'' + _truckstopCreds.UserName + '\'')
    storedTokens = dbo.sqlCursor.fetchall()
    if (len(storedTokens) > 0):  # Check if tokens exist for the user
        print("Updating Database with new tokens")
        # Update existing token records
        dbo.sqlCursor.execute('UPDATE UserTokens SET AccessToken = \'' + token.AccessToken + '\', RefreshToken = \'' + token.RefreshToken + '\', AccessTokenExpiration = \'' + token.AccessTokenExpiration + '\', RefreshTokenExpiration=\'' + token.RefreshTokenExpiration + '\' WHERE TruckstopUserName = \'' + _truckstopCreds.UserName + '\'')
    else:  # Insert new token records
        print("Inserting new user tokens into database")
        dbo.sqlCursor.execute('INSERT INTO UserTokens (TruckstopUserId,TruckstopUserName, AccessToken, RefreshToken, AccessTokenExpiration, RefreshTokenExpiration) values (\'' + token.TruckstopUserId + '\',\'' + _truckstopCreds.UserName + '\',\'' + token.AccessToken + '\', \'' + token.RefreshToken + '\', \'' + token.AccessTokenExpiration + '\', \'' + token.RefreshTokenExpiration + '\')')
    dbo.sqlConn.commit()  # Commit changes to the database

def ManageTruckstopTokens(truckstopCreds, isSandbox):
    # Main function to manage Truckstop API tokens
    global dbo 
    dbo = __SetupDB()  # Setup the database
    global _truckstopCreds 
    _truckstopCreds = truckstopCreds  # Store Truckstop credentials
    
    # Check for stored tokens in the database
    storedTokens = dbo.sqlCursor.execute('SELECT * FROM UserTokens WHERE TruckstopUserName = \'' + _truckstopCreds.UserName + '\'')
    storedTokens = dbo.sqlCursor.fetchall()
    
    if (len(storedTokens) > 0):  # If tokens exist
        AccessTokenExpiration = dateutil.parser.parse(storedTokens[0][5])        
        if (AccessTokenExpiration < datetime.datetime.now(datetime.timezone.utc)):  # Check if the access token is expired
            print("Token Expired: Refreshing")
            refreshToken = storedTokens[0][4]  # Get refresh token
            response = __RefreshTokens(refreshToken, isSandbox)  # Refresh tokens
            token = __ParseResponse(response)  # Parse the new tokens
            if (token):
                print("Successfully refreshed token")
                __ManageDBTokens(token)  # Update the database with new tokens
                return token
            else:  # If refresh fails, request new tokens
                print("Refresh failed, generating new token set")
                response = __RequestNewTokens(isSandbox)  # Request new tokens
                token = __ParseResponse(response)  # Parse the new tokens
                if(token):
                    __ManageDBTokens(token)  # Update the database with new tokens
                    return token
                else:
                    print("There was an error generating tokens:")
                    print(response.text)  # Log error message
                    return None
        else:  # Return existing tokens if valid
            token = Token(storedTokens[0][1],storedTokens[0][2], storedTokens[0][3],storedTokens[0][4],storedTokens[0][5], storedTokens[0][6])
            return token
    else:  # If no tokens exist, request new tokens
        response = __RequestNewTokens(isSandbox)
        token = __ParseResponse(response)
        if(token):
            __ManageDBTokens(token)  # Update the database with new tokens
            return token
        else:
            print("There was an error generating tokens:")
            print(response.text)  # Log error message
            return None