import requests
import sqlite3
import json
import datetime
import dateutil
# Global variables for database connection and Truckstop credentials
dbo = None
_truckstopCreds = None
class DBO:
def __init__(self, sqlConn, sqlCursor):
self.sqlConn = sqlConn
self.sqlCursor = sqlCursor
class TruckstopCredentials:
def __init__(self, Username, Password, Base64):
self.UserName = Username
self.Password = Password
self.Base64 = Base64
class Token:
def __init__(self, TruckstopUserId, TruckstopUserName, AccessToken, RefreshToken, AccessTokenExpiration, RefreshTokenExpiration):
self.TruckstopUserId = TruckstopUserId
self.TruckstopUserName = TruckstopUserName
self.AccessToken = AccessToken
self.RefreshToken = RefreshToken
self.AccessTokenExpiration = AccessTokenExpiration
self.RefreshTokenExpiration = RefreshTokenExpiration
def __SetupDB():
# Initializes SQLite database and creates a UserTokens table if it doesn't exist
sqlConn = sqlite3.connect('./UserTokens.sqlite') # Connect to SQLite database
sqlCursor = sqlConn.cursor()
sqlCursor.execute('CREATE TABLE IF NOT EXISTS UserTokens (ID INTEGER PRIMARY KEY, TruckstopUserId varchar(255), TruckstopUserName varchar(255), AccessToken varchar(255), RefreshToken varchar(255), AccessTokenExpiration varchar(255), RefreshTokenExpiration varchar(255))')
return DBO(sqlConn, sqlCursor) # Return database object
def __RequestNewTokens(isSandbox=False):
# Requests new access and refresh tokens using user credentials
if not isSandbox:
url = "https://api.truckstop.com/auth/token?scope=truckstop"
else:
url = "https://api-int.truckstop.com/auth/token?scope=truckstop"
payload = {
"grant_Type": "password", # Grant type for password flow
"username": _truckstopCreds.UserName,
"password": _truckstopCreds.Password
}
headers = {
"accept": "application/json",
"Authorization": "basic " + _truckstopCreds.Base64,
"content-type": "application/x-www-form-urlencoded"
}
response = requests.post(url, data=payload, headers=headers) # Send POST request
return response # Return the API response
def __RefreshTokens(refreshToken, isSandbox=False):
# Requests new tokens using an existing refresh token
if not isSandbox:
url = "https://api.truckstop.com/auth/token?scope=truckstop"
else:
url = "https://api-int.truckstop.com/auth/token?scope=truckstop"
payload = {
"grant_Type": "refresh_token", # Grant type for refresh token flow
"refresh_token": refreshToken
}
headers = {
"accept": "application/json",
"Authorization": "basic " + _truckstopCreds.Base64,
"content-type": "application/x-www-form-urlencoded"
}
response = requests.post(url, data=payload, headers=headers) # Send POST request
return response # Return the API response
def __ParseResponse(response):
# Parses the API response to extract token information
if (response.status_code == 200): # Check for successful response
response = response.text
response = json.loads(response) # Parse JSON response
claims = json.loads(response["claims"]) # Extract claims
current_time = datetime.datetime.now(datetime.timezone.utc) # Get current UTC time
# Extract token details
AccessToken = response["access_token"]
RefreshToken = response["refresh_token"]
TruckstopUserId = claims[1]["Value"]
# Calculate token expiration times
AccessTokenExpiration = current_time + datetime.timedelta(seconds=1169)
RefreshTokenExpiration = current_time + dateutil.relativedelta.relativedelta(months=6)
# Return a Token object with the extracted information
return Token(TruckstopUserId, claims[8]["Value"], AccessToken, RefreshToken, AccessTokenExpiration.isoformat(), RefreshTokenExpiration.isoformat())
def __ManageDBTokens(token):
# Updates or inserts token information into the SQLite database
storedTokens = dbo.sqlCursor.execute('SELECT * FROM UserTokens WHERE TruckstopUserName = \'' + _truckstopCreds.UserName + '\'')
storedTokens = dbo.sqlCursor.fetchall()
if (len(storedTokens) > 0): # Check if tokens exist for the user
print("Updating Database with new tokens")
# Update existing token records
dbo.sqlCursor.execute('UPDATE UserTokens SET AccessToken = \'' + token.AccessToken + '\', RefreshToken = \'' + token.RefreshToken + '\', AccessTokenExpiration = \'' + token.AccessTokenExpiration + '\', RefreshTokenExpiration=\'' + token.RefreshTokenExpiration + '\' WHERE TruckstopUserName = \'' + _truckstopCreds.UserName + '\'')
else: # Insert new token records
print("Inserting new user tokens into database")
dbo.sqlCursor.execute('INSERT INTO UserTokens (TruckstopUserId,TruckstopUserName, AccessToken, RefreshToken, AccessTokenExpiration, RefreshTokenExpiration) values (\'' + token.TruckstopUserId + '\',\'' + _truckstopCreds.UserName + '\',\'' + token.AccessToken + '\', \'' + token.RefreshToken + '\', \'' + token.AccessTokenExpiration + '\', \'' + token.RefreshTokenExpiration + '\')')
dbo.sqlConn.commit() # Commit changes to the database
def ManageTruckstopTokens(truckstopCreds, isSandbox):
# Main function to manage Truckstop API tokens
global dbo
dbo = __SetupDB() # Setup the database
global _truckstopCreds
_truckstopCreds = truckstopCreds # Store Truckstop credentials
# Check for stored tokens in the database
storedTokens = dbo.sqlCursor.execute('SELECT * FROM UserTokens WHERE TruckstopUserName = \'' + _truckstopCreds.UserName + '\'')
storedTokens = dbo.sqlCursor.fetchall()
if (len(storedTokens) > 0): # If tokens exist
AccessTokenExpiration = dateutil.parser.parse(storedTokens[0][5])
if (AccessTokenExpiration < datetime.datetime.now(datetime.timezone.utc)): # Check if the access token is expired
print("Token Expired: Refreshing")
refreshToken = storedTokens[0][4] # Get refresh token
response = __RefreshTokens(refreshToken, isSandbox) # Refresh tokens
token = __ParseResponse(response) # Parse the new tokens
if (token):
print("Successfully refreshed token")
__ManageDBTokens(token) # Update the database with new tokens
return token
else: # If refresh fails, request new tokens
print("Refresh failed, generating new token set")
response = __RequestNewTokens(isSandbox) # Request new tokens
token = __ParseResponse(response) # Parse the new tokens
if(token):
__ManageDBTokens(token) # Update the database with new tokens
return token
else:
print("There was an error generating tokens:")
print(response.text) # Log error message
return None
else: # Return existing tokens if valid
token = Token(storedTokens[0][1],storedTokens[0][2], storedTokens[0][3],storedTokens[0][4],storedTokens[0][5], storedTokens[0][6])
return token
else: # If no tokens exist, request new tokens
response = __RequestNewTokens(isSandbox)
token = __ParseResponse(response)
if(token):
__ManageDBTokens(token) # Update the database with new tokens
return token
else:
print("There was an error generating tokens:")
print(response.text) # Log error message
return None