Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- # Version 27
- # POSTED ONLINE: https://pastebin.com/Dp4dK0Mz
- # SOURCE: https://stackoverflow.com/questions/3029816/how-do-i-get-a-thread-safe-print-in-python-2-6
- # Python uses separate opcodes for object printing and newline printing.
- # With multiple threads printing you would encounter newlines printed out of turn separate from the objects.
- # Joining the strings before calling print(...) fixes the issue.
- def safe_print(*args, sep=' ', end='\n', file=None, flush=False):
- # NOTE: Using end=None, or removing end='', will result in an extra newline in the output.
- print(sep.join(map(str, args)) + end, sep=sep, end='', file=file, flush=flush)
- # -----
- import asyncio
- class App:
- def __init__(self, name, installedVersion, patternData, urlData):
- self.name = name
- self.installedVersion = installedVersion
- self.__pattern = patternData[0]
- self.__patternMatchIndex = patternData[1]
- self.__url = urlData[0]
- self.__urlEncoding = urlData[1]
- async def GetLatestVersion(self):
- loop = asyncio.get_running_loop()
- if 'api.github.com' in self.__url:
- latestVersion = await loop.run_in_executor(None, search_json, self.__pattern, self.__url, [0, 'name'], self.__urlEncoding, self.__patternMatchIndex)
- else:
- latestVersion = await loop.run_in_executor(None, search_url, self.__pattern, self.__url, self.__urlEncoding, self.__patternMatchIndex)
- return latestVersion
- async def DisplayStatus(self, skipUpToDate=True):
- latestVersion = await self.GetLatestVersion()
- upToDate = self.installedVersion == latestVersion
- if latestVersion == None:
- safe_print(color(' ! {}: Failed to detect the latest version (Last known version: {} --> ?)\n URL: {}\n'.format(self.name, self.installedVersion, self.__url), RED))
- elif not upToDate:
- safe_print(color(' * {}: Update detected (Last known version: {} --> {})\n URL: {}\n'.format(self.name, self.installedVersion, latestVersion, self.__url), YELLOW))
- elif not skipUpToDate:
- safe_print(color(' - {}: No update detected (Last known version: {})\n'.format(self.name, latestVersion), GREEN))
- return not upToDate
- # -----
- import json
- def search_json(pattern, url, indicies, encoding='UTF-8', matchIndex=0, timeoutSeconds=5):
- response = download_url(url, encoding, timeoutSeconds)
- if response == None:
- return None
- data = json.loads(response)
- for index in indicies:
- data = data[index]
- return regex_search(data, pattern, matchIndex)
- def search_url(pattern, url, encoding='UTF-8', matchIndex=0, timeoutSeconds=5):
- response = download_url(url, encoding, timeoutSeconds)
- return regex_search(response, pattern, matchIndex)
- # -----
- import urllib.request
- from urllib.error import *
- import socket
- def download_url(url, encoding='UTF-8', timeoutSeconds=5):
- try:
- response = urllib.request.urlopen(url, timeout=timeoutSeconds, context=unverifiedContext).read().decode(encoding)
- except HTTPError as error:
- safe_print(' HTTP-Error: Data not retrieved because {}\n URL: {}\n'.format(error, url))
- except URLError as error:
- if isinstance(error.reason, socket.timeout):
- safe_print(' Timeout-Error: Data not retrieved because {}\n URL: {}\n'.format(error, url))
- else:
- safe_print(' URL-Error: Data not retrieved because {}\n URL: {}\n'.format(error, url))
- else:
- return response
- # Disable certificate verification with unverified SSL context.
- # SOURCE: https://support.chainstack.com/hc/en-us/articles/9117198436249-Common-SSL-Issues-on-Python-and-How-to-Fix-it
- import ssl
- unverifiedContext = ssl._create_unverified_context()
- # -----
- import re
- def regex_search(string, pattern, matchIndex=0):
- if string == None:
- return None
- matches = re.findall(pattern, string)
- if matches == None or len(matches) == 0:
- return None
- return matches[matchIndex]
- # -----
- # BLACK = '\033[30m'
- RED = '\033[31m'
- GREEN = '\033[32m'
- YELLOW = '\033[33m'
- BLUE = '\033[34m'
- MAGENTA = '\033[35m'
- CYAN = '\033[36m'
- WHITE = '\033[37m'
- UNDERLINE = '\033[4m'
- RESET = '\033[0m'
- def color(var, color):
- return color + str(var) + RESET
- import platform
- system = platform.system()
- if system == 'Windows':
- import os
- # Needed to display colored text on Windows for some reason.
- os.system('cls')
- # -----
- import socket
- async def wait_for_internet(host="www.google.com", port=80, timeout=1):
- try:
- with socket.create_connection((host, port), timeout):
- # safe_print(color(" Internet connection established . . .\n", GREEN))
- return True
- except OSError:
- safe_print(color(" Waiting for internet connection . . .\n", YELLOW))
- while True:
- try:
- with socket.create_connection((host, port), timeout):
- # safe_print(color(" Internet connection established . . .\n", GREEN))
- return True
- except OSError:
- await asyncio.sleep(1) # Retry every second.
- # -----
- def wait_for_any_keypress():
- import sys
- if sys.platform == 'win32':
- import os
- os.system('pause')
- elif sys.platform.startswith('linux') or sys.platform == 'darwin':
- print('Press any key to continue . . .')
- import termios
- import tty
- stdin_file_desc = sys.stdin.fileno()
- old_stdin_tty_attr = termios.tcgetattr(stdin_file_desc)
- try:
- tty.setraw(stdin_file_desc)
- sys.stdin.read(1)
- finally:
- termios.tcsetattr(stdin_file_desc, termios.TCSADRAIN, old_stdin_tty_attr)
- # -----
- # NOTE: You need to update these version numbers manually until a way is found to query them from the installed software directly.
- apps = [
- App('Voicemeeter Banana', '2.1.1.9', (r'Version ([0-9.]+)(?: \([\w\s]+\))', 1), ('https://voicemeeter.com', 'UTF-8')),
- # App('Voicemeeter Banana', '2.0.6.8', (r'Voicemeeter ([0-9.]+) \(ZIP Package\)', 0), ('https://vb-audio.com/Voicemeeter/banana.htm', 'UTF-8')), # This website keeps dying...
- App('Equalizer APO', '1.4.1', (r'EqualizerAPO(?:.*?)-([0-9.]+)\.exe', 0), ('https://sourceforge.net/projects/equalizerapo/files/', 'UTF-8')),
- App('HeSuVi', '2.0.0.1', (r'HeSuVi_([0-9.]+)\.exe', 0), ('https://sourceforge.net/projects/hesuvi/files/', 'UTF-8')),
- App('IEM Plug-in Suite', '1.14.1', (r'Latest version: <strong>v([0-9.]+)', 0), ('https://plugins.iem.at/download/', 'UTF-8')),
- # App('Vim', '9.1', (r'Vim ([0-9.]+) is the latest stable version', 0), ('https://www.vim.org/download.php/', 'ISO-8859-1')), # This website keeps dying...
- App('Vim', '9.1', (r'\d+\.\d+', 0), ('https://api.github.com/repos/vim/vim/tags', 'UTF-8')),
- App('Nvidia', '566.36', (r'GeForce ([0-9.]+)', 0), ('https://www.guru3d.com/files/category/videocards-nvidia-geforce-windows-7-8-10/', 'UTF-8')),
- App('MSI Afterburner', '4.6.6', (r'MSI Afterburner ([0-9.]+) Download', 0), ('https://www.guru3d.com/download/msi-afterburner-beta-download/', 'UTF-8')),
- ]
- async def main():
- import sys
- pauseNeeded = len(sys.argv) == 1 or sys.argv[1] != 'nopause'
- await wait_for_internet()
- updateNeeded = False
- tasks = [app.DisplayStatus(skipUpToDate=not pauseNeeded) for app in apps]
- for future in asyncio.as_completed(tasks):
- updateNeeded |= await future
- if pauseNeeded or updateNeeded:
- wait_for_any_keypress()
- if __name__ == '__main__':
- asyncio.run(main())
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement