Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/python3
- #
- # client is publishing enocean sensors to various listed iOt telemetry or email or sms text message
- # subscriber sister application is to read this from the mosquito broker for example if you are doing the mqtt internally on raspberry pi 4
- #
- # import neccessary libraries
- import paho.mqtt.client as mqtt
- import requests
- import json
- import base64
- import serial
- import time
- from sys import exit
- from datetime import datetime
- # enocean
- PORT = '/dev/ttyUSB400J' # USB400J The name of the device to which it is connected
- SENSORID1 = '04:01:53:e1' # ID of the 1st STM-431J
- SENSORID2 = '04:00:6f:e5' # ID of the 2nd STM-431J
- s_port = 0 # serial port number handle
- BAUD_RT = 57600 # serial baud rate
- # soracom
- URL = 'http://harvest.soracom.io/' # SORACOM HarvestのURL
- # mosquito
- MTOPIC = "topic/B/"
- MQTT_PORT = 1883
- KEEP_ALIVE = 60
- # beebotte
- TOKEN = "[BeebotteTKName]"
- HOSTNAME = "mqtt.beebotte.com"
- BPORT = 8883
- BTOPIC = "MyEnOceanSensors/temperature" # beebotte channel/resource
- CACERT = "mqtt.beebotte.com.pem"
- # ubidots
- # pip install ubidots==1.6.6
- # sudo apt-get install python-setuptools
- UBI_URL=https://industrial.api.ubidots.com/api/v1.6
- # machinist
- MAPIKEY = "xxxxxx"
- MUrl = "https://gw.machinist.iij.jp/endpoint"
- # aws
- AWS_TOPIC_NAME = "my/temperatures"
- AWS_CLIENT="iot-data"
- # azure
- AZURE_TYPE = "connection_string" # choose connection_string or provisioning_host
- AZURE_CONN_STR="yourConnectionString"
- # ensure environment variables are set for your device and IoT Central application credentials
- PROVISIONING_HOST = "your host"
- ID_SCOPE = "your_scope"
- DEVICE_ID = "your_device"
- DEVICE_KEY = "your_key"
- provisioning_host = PROVISIONING_HOST
- id_scope = ID_SCOPE
- registration_id = DEVICE_ID
- symmetric_key = DEVICE_KEY
- # yandex
- YPORT=8443
- YHOST="rc1c-xxxx123456.mdb.yandexcloud.net"
- YCERTNAME="your_cert_name"
- YUrl=r'https://api.iot.yandex.net/v1.0/devices/actions'
- YAPI_KEY="xxxxxx"
- import datetime
- import pytz
- # smtp
- smtp_server = "smtp.gmail.com"
- port = 587
- # ssl server with tls_set
- SSLHOST, SSLPORT = "127.0.0.1", 12345
- # ssl23
- SSL_URL = '127.0.0.1'
- SSL_PORT = 10023
- # cloud MQTT
- CHOSTNAME = "driver.cloudmqtt.com"
- CPORT = 28607
- CTOPIC = "pi/sub2"
- CCACERT = "/etc/ssl/certs/ca-certificates.crt"
- #define a global MQTT Topic which will be set upon choices
- GTOPIC=" "
- # ------------ here list the choices and options for iOt -----------------
- TELEM_CHOICES=[ "soracom", "beebotte", "mosquito", "ubidots", "machinist", "aws", "azure", "yandex", "twillio", "smtp_email", "ssl_tls_server", "ssl_23_server", "cloud_mqtt", "gcs_blob", "splunk" ]
- SORACOM=0
- BEEBOTTE=1
- MOSQUITO=2
- UBIDOTS=3
- MACHINIST=4
- AWS=5
- AZURE=6
- YANDEX=7
- TWILLIO=8
- SMTP_EMAIL=9
- SSL_TLS_SERVER=10
- SSL_23_SERVER=11
- CLOUD_MQTT=12
- BLOB_GCS=13
- SPLUNK=14
- # ============= make your choice of cloud service here from list above ==================
- MY_CURRENT_TELEM=TELEM_CHOICES[SORACOM]
- # if you want to use AES with SSLv23
- AES_ENCRYPT=0
- # Broker processes when connected to
- def on_connect(client, userdata, flag, rc):
- print("Connect Broker:" + str(rc))
- client.subscribe(GTOPIC)
- # Broker processes when disconnected
- def on_disconnect(client, userdata, rc):
- if rc != 0:
- print("disconnect broker")
- # publish processes when publish done
- def on_publish(client, userdata, mid):
- print("publish Done")
- # This function reads 1 byte of data from the serial port and parses the EnOcean telegram. After analyzing
- # Telegram, data is sent to the chosen iOt system
- def Enocean2Telemetry(s_port,telem_opt):
- # SORACOM Harvest。
- def sendDataSoraCom(string1, temp_data1, string2, temp_data2):
- headers = {'content-type': 'application/json'}
- payload = {string1:temp_data1}
- print payload
- try:
- req = requests.post(URL, data=json.dumps(payload), headers=headers)
- req.raise_for_status()
- except requests.exceptions.HTTPError as errh:
- print ("Http Error:",errh)
- except requests.exceptions.ConnectionError as errc:
- print ("Error Connecting:",errc)
- except requests.exceptions.Timeout as errt:
- print ("Timeout Error:",errt)
- except requests.exceptions.RequestException as err:
- print ("OOps: Something Else",err)
- headers = {'content-type': 'application/json'}
- payload = {string2:temp_data2}
- print payload
- try:
- req = requests.post(URL, data=json.dumps(payload), headers=headers)
- req.raise_for_status()
- except requests.exceptions.HTTPError as errh:
- print ("Http Error:",errh)
- except requests.exceptions.ConnectionError as errc:
- print ("Error Connecting:",errc)
- except requests.exceptions.Timeout as errt:
- print ("Timeout Error:",errt)
- except requests.exceptions.RequestException as err:
- print ("OOps: Something Else",err)
- # SPLUNK
- def sendDataSplunk(string1, temp_data1, string2, temp_data2):
- splunk_config = {
- "wifi": {
- "ssid": "WIFI SID",
- "password": "WIFI YOUR_PASSWD"
- },
- "hec": {
- "url": "http://SplunkMY_IDIP:8088/services/collector/event",
- "token": "HTTP Event Collector YOUR_TOKEN",
- "hostname": "MySplunkServer"
- }
- }
- url = splunk_config['hec']['url']
- token = splunk_config['hec']['token']
- host = splunk_config['hec']['hostname']
- data_val = { "values" : [ {"metric_name" : string1, "sensor" : SENSORID1, "_value" : temp_data1}, {"metric_name" : string2 , "sensor" : SENSORID2, "_value" : temp_data2} ] }
- source=None
- index=None
- timestamp=round(datetime.datetime.now(pytz.timezone('Europe/Moscow')).timestamp())
- u'Send measured data to Splunk HEC'
- headers = {'Authorization': 'Splunk {}'.format(token)}
- payload = []
- #for data in values:
- for cnt in range(len(data_val['values'])):
- p = {
- "event": "metric",
- "source": source if source is not None else data_val['values'][int(cnt)]['sensor'],
- "host": host,
- "fields": {
- "metric_name": data_val['values'][int(cnt)]['metric_name'],
- "sensor": data_val['values'][int(cnt)]['sensor'],
- "_value": data_val['values'][int(cnt)]['_value'],
- }
- }
- if index is not None:
- p['index'] = index
- if timestamp is not None:
- p['time'] = timestamp
- payload.append(p)
- print("[DEBUG] HEC payload", payload)
- try:
- res = requests.post(url, headers=headers, data=json.dumps(payload))
- req.raise_for_status()
- except requests.exceptions.HTTPError as errh:
- print ("Http Error:",errh)
- except requests.exceptions.ConnectionError as errc:
- print ("Error Connecting:",errc)
- except requests.exceptions.Timeout as errt:
- print ("Timeout Error:",errt)
- except requests.exceptions.RequestException as err:
- print ("OOps: Something Else",err)
- # MOSQUITO
- def sendDataMosquito(string1, enO_temp_value1, string2, enO_temp_value2):
- msg = string1 + enO_temp_value1
- client.publish(MTOPIC,msg)
- msg = string2 + enO_temp_value2
- client.publish(MTOPIC,msg)
- client.disconnect()
- # BEEBOTTE
- def sendDataBeebotte(string1, enO_temp_value1, string2, enO_temp_value2):
- msg = string1 + enO_temp_value1
- client.publish(BTOPIC,msg)
- msg = string2 + enO_temp_value2
- client.publish(BTOPIC,msg)
- client.disconnect()
- # CLOUD MQTT
- def sendDataCloudMqtt(string1, enO_temp_value1, string2, enO_temp_value2):
- msg = string1 + enO_temp_value1
- client.publish(CTOPIC,msg,0)
- msg = string2 + enO_temp_value2
- client.publish(CTOPIC,msg,0)
- client.disconnect()
- # AWS
- def sendDataAws(string1, enO_temp_value1, string2, enO_temp_value2):
- data = {
- "temp1": enO_temp_value1,
- "temp1_desc": string1,
- "temp2": enO_temp_value2,
- "temp2_desc": string2,
- }
- json_data = bytes(json.dumps(data),"utf-8")
- print(f"sending to aws -> {sys.getsizeof(json_data)} bytes")
- client = boto3.client(AWS_CLIENT)
- client.publish(topic=AWS_TOPIC_NAME, payload=json_data)
- # UBIDOTS
- def sendDataUbiDots(string1, temp_data1, string2, temp_data2):
- try:
- my_variable = api.get_variable('56799cf1231b28459f976417')
- except UbidotsError400 as e:
- print("General Description: %s; and the detail: %s" % (e.message, e.detail))
- except UbidotsForbiddenError as e:
- print("For some reason my account does not have permission to read this variable")
- print("General Description: %s; and the detail: %s" % (e.message, e.detail))
- payload = {string1:int(temp_data1)}
- print payload
- new_value = my_variable.save_value(payload)
- try:
- my_variable2 = api.get_variable('56799cf1031b28459f976718')
- except UbidotsError400 as e:
- print("General Description: %s; and the detail: %s" % (e.message, e.detail))
- except UbidotsForbiddenError as e:
- print("For some reason my account does not have permission to read this variable")
- print("General Description: %s; and the detail: %s" % (e.message, e.detail))
- payload = {string2:int(temp_data2)}
- print payload
- new_value = my_variable2.save_value(payload)
- # AZURE
- def sendDataAzure(descrip1,temp_data1,descrip2,temp_data2):
- print("Sending telemetry for temperature to azure")
- temperature_msg = {descrip1 : temp_data1}
- msg = Message(json.dumps(temperature_msg))
- msg.content_encoding = "utf-8"
- msg.content_type = "application/json"
- print("Send message : ",msg)
- device_client.send_message(msg)
- temperature_msg1 = {descrip2 : temp_data2}
- msg1 = Message(json.dumps(temperature_msg1))
- msg1.content_encoding = "utf-8"
- msg1.content_type = "application/json"
- print("Send message : ",msg1)
- device_client.send_message(msg1)
- device_client.disconnect()
- device_client.shutdown()
- # sub functions to store certs in azure key vaalts and may be useful for management of certs
- #
- def createCertAzure():
- from azure.identity import DefaultAzureCredential
- from azure.keyvault.certificates import CertificateClient, CertificatePolicy
- credential = DefaultAzureCredential()
- certificate_client = CertificateClient(vault_url="https://my-key-vault.vault.azure.net/", credential=credential)
- create_certificate_poller = certificate_client.begin_create_certificate(certificate_name="cert-name", policy=CertificatePolicy.get_default())
- print(create_certificate_poller.result())
- def getLatestCertAzure():
- from azure.identity import DefaultAzureCredential
- from azure.keyvault.certificates import CertificateClient
- credential = DefaultAzureCredential()
- certificate_client = CertificateClient(vault_url="https://my-key-vault.vault.azure.net/", credential=credential)
- certificate = certificate_client.get_certificate("cert-name")
- print(certificate.name)
- print(certificate.properties.version)
- print(certificate.policy.issuer_name)
- def updateCertAzure():
- from azure.identity import DefaultAzureCredential
- from azure.keyvault.certificates import CertificateClient
- credential = DefaultAzureCredential()
- certificate_client = CertificateClient(vault_url="https://my-key-vault.vault.azure.net/", credential=credential)
- # we will now disable the certificate for further use
- updated_certificate= certificate_client.update_certificate_properties(certificate_name="cert-name", enabled=False)
- def deleteCertAzure():
- from azure.identity import DefaultAzureCredential
- from azure.keyvault.certificates import CertificateClient
- credential = DefaultAzureCredential()
- certificate_client = CertificateClient(vault_url="https://my-key-vault.vault.azure.net/", credential=credential)
- deleted_certificate_poller = certificate_client.begin_delete_certificate("cert-name")
- deleted_certificate = deleted_certificate_poller.result()
- print(deleted_certificate.name)
- print(deleted_certificate.deleted_on)
- print(updated_certificate.name)
- print(updated_certificate.properties.enabled)
- def listCertAzure():
- from azure.identity import DefaultAzureCredential
- from azure.keyvault.certificates import CertificateClient
- credential = DefaultAzureCredential()
- certificate_client = CertificateClient(vault_url="https://my-key-vault.vault.azure.net/", credential=credential)
- certificates = certificate_client.list_properties_of_certificates()
- for certificate in certificates:
- # this list doesn't include versions of the certificates
- print(certificate.name)
- # YANDEX
- def makeYandexTable(table_name):
- statement = f"""CREATE TABLE {table_name} (
- ( telemetry_timestamp timestamp,
- device_nm varchar(200),
- payload varchar(2000)
- );
- """
- return statement
- def sendCreateYandexTable(my_query):
- timestamp = datetime.datetime.now(pytz.timezone('Europe/Moscow')).timestamp() # set the timezone as you wish for your location
- event_ts=round(timestamp)
- insert= my_query
- data = {
- 'ca': YCERTNAME,
- 'path': '/myHome/', {
- 'database': 'pxc_cloud_db',
- 'query': insert,
- },
- 'port': YPORT,
- 'hostname': YHOST,
- }
- headers = {
- "Content-Type": "application/json",
- "Authorization": f"Api-Key {YAPI_KEY}",
- "User-Agent": "Python3"
- }
- senddatajson = json.dumps(data).encode("ascii")
- try:
- req = requests.post(YUrl, data=senddatajson, headers=headers)
- req.raise_for_status()
- except requests.exceptions.HTTPError as errh:
- print ("Http Error:",errh)
- except requests.exceptions.ConnectionError as errc:
- print ("Error Connecting:",errc)
- except requests.exceptions.Timeout as errt:
- print ("Timeout Error:",errt)
- except requests.exceptions.RequestException as err:
- print ("OOps: Something Else",err)
- # YANDEX
- def sendDataYandex(descrip1,temp_data1,descrip2,temp_data2):
- #msg = json.loads(payload_json)
- #msg_str = json.dumps(msg)
- msg_str = temp_data1
- timestamp = datetime.datetime.now(pytz.timezone('Europe/Moscow')).timestamp() # set the timezone as you wish for your location
- event_ts=round(timestamp)
- insert= f"""INSERT INTO pxc_cloud_db.timeseries_example (telemetry_timestamp , device_nm , payload) VALUES ('{event_ts}','{descrip1}', '{msg_str}')"""
- data = {
- 'ca': YCERTNAME,
- 'path': '/myHome/', {
- 'database': 'pxc_cloud_db',
- 'query': insert,
- },
- 'port': YPORT,
- 'hostname': YHOST,
- }
- headers = {
- "Content-Type": "application/json",
- "Authorization": f"Api-Key {YAPI_KEY}",
- "User-Agent": "Python3"
- }
- senddatajson = json.dumps(data).encode("ascii")
- try:
- req = requests.post(YUrl, data=senddatajson, headers=headers)
- req.raise_for_status()
- except requests.exceptions.HTTPError as errh:
- print ("Http Error:",errh)
- except requests.exceptions.ConnectionError as errc:
- print ("Error Connecting:",errc)
- except requests.exceptions.Timeout as errt:
- print ("Timeout Error:",errt)
- except requests.exceptions.RequestException as err:
- print ("OOps: Something Else",err)
- msg_str = temp_data2
- timestamp = datetime.datetime.now(pytz.timezone('Europe/Moscow')).timestamp() # set the timezone as you wish for your location
- event_ts=round(timestamp)
- insert= f"""INSERT INTO pxc_cloud_db.timeseries_example (telemetry_timestamp , device_nm , payload) VALUES ('{event_ts}','{descrip2}', '{msg_str}')"""
- data = {
- 'ca': YCERTNAME,
- 'path': '/myHome/', {
- 'database': 'pxc_cloud_db',
- 'query': insert,
- },
- 'port': YPORT,
- 'hostname': YHOST,
- }
- headers = {
- // 'X-ClickHouse-User': user,
- // 'X-ClickHouse-Key': password,
- "Content-Type": "application/json",
- "Authorization": f"Api-Key {YAPI_KEY}",
- "User-Agent": "Python3"
- }
- senddatajson = json.dumps(data).encode("ascii")
- try:
- req = requests.post(YUrl, data=senddatajson, headers=headers)
- req.raise_for_status()
- except requests.exceptions.HTTPError as errh:
- print ("Http Error:",errh)
- except requests.exceptions.ConnectionError as errc:
- print ("Error Connecting:",errc)
- except requests.exceptions.Timeout as errt:
- print ("Timeout Error:",errt)
- except requests.exceptions.RequestException as err:
- print ("OOps: Something Else",err)
- # GCS BLOB UPLOAD
- def uploadBlobGcs(descrip1,temp_data1,descrip2,temp_data2):
- """Writes json to file then uploads csv to the bucket on GCS """
- import csv
- from google.cloud import storage
- destination_blob_name = "your_data_1"
- source_file_name = f'/tmp/{destination_blob_name}'
- bucket_name = 'yourdataalarms'
- fieldnames = ['sensor1_desc', 'sensor1_temp', 'sensor2_desc', 'sensor2_temp', 'timestamp']
- json_obj = {}
- json_obj['sensor1_desc'] = descrip1
- json_obj['sensor1_temp'] = temp_data1
- json_obj['sensor2_desc'] = descrip2
- json_obj['sensor2_temp'] = temp_data2
- json_obj['timestamp'] = round(timestamp = datetime.datetime.now(pytz.timezone('Europe/Moscow')).timestamp())
- json_obj['timestamp'] = datetime.datetime.utcfromtimestamp(int(json_obj['timestamp'])//1000).strftime('%Y-%m-%d %H:%M:%S')
- with open(source_file_name, 'w') as f:
- writer = csv.DictWriter(f, fieldnames=fieldnames)
- writer.writeheader()
- writer.writerow(json_obj)
- storage_client = storage.Client()
- bucket = storage_client.bucket(bucket_name)
- blob = bucket.blob(destination_blob_name)
- blob.upload_from_filename(source_file_name)
- print("File {} uploaded to {}.".format(source_file_name, destination_blob_name))
- # MACHINIST
- def sendDataMachinist(descrip1,temp_data1,descrip2,temp_data2):
- data = {
- "agent": "GPIO",
- "metrics": [
- {
- "name": descrip1,
- "namespace": "STM-431J Temp Sensor",
- "data_point": {
- "value": temp_data1
- }
- }
- ]
- }
- headers = {
- "Content-Type": "application/json",
- "Authorization": "Bearer " + MAPIKEY,
- "User-Agent": "Python3"
- }
- senddatajson = json.dumps(data).encode("ascii")
- try:
- req = requests.post(MUrl, data=senddatajson, headers=headers)
- req.raise_for_status()
- except requests.exceptions.HTTPError as errh:
- print ("Http Error:",errh)
- except requests.exceptions.ConnectionError as errc:
- print ("Error Connecting:",errc)
- except requests.exceptions.Timeout as errt:
- print ("Timeout Error:",errt)
- except requests.exceptions.RequestException as err:
- print ("OOps: Something Else",err)
- data2 = {
- "agent": "GPIO",
- "metrics": [
- {
- "name": descrip2,
- "namespace": "STM-431J Temp Sensor",
- "data_point": {
- "value": temp_data2
- }
- }
- ]
- }
- senddatajson = json.dumps(data2).encode("ascii")
- try:
- req = requests.post(MUrl, data=senddatajson, headers=headers)
- req.raise_for_status()
- except requests.exceptions.HTTPError as errh:
- print ("Http Error:",errh)
- except requests.exceptions.ConnectionError as errc:
- print ("Error Connecting:",errc)
- except requests.exceptions.Timeout as errt:
- print ("Timeout Error:",errt)
- except requests.exceptions.RequestException as err:
- print ("OOps: Something Else",err)
- # TWILIO SMS
- def sendTwilioSMS(descrip1,temp_data1,descrip2,temp_data2):
- from twilio.rest import Client
- tw_account_sid = "Account SID"
- tw_auth_teoken = "AUTHTOKEN"
- tw_to_number = "Destination phone number"
- tw_from_number = "Twilio phone number for the trial obtained"
- client = Client(account_sid, auth_teoken)
- ts = round(datetime.datetime.now(pytz.timezone('Europe/Moscow')).timestamp()) # set the timezone as you wish for your location
- bodytext=" got the data at " + ts + " " + descrip1 + " : " + str(temp_data1) + " " + descrip2 + " : " + str(temp_data2)
- message = client.messages.create(body=bodytext,from_=from_number,to=to_number)
- print(message.sid)
- #return message.sid
- # SMTP EMAIL SERVER
- # to check library classes import inspect then
- # inspect.getmro(MIMEText)
- def sendSMTPemail(descrip1,temp_data1,descrip2,temp_data2):
- import smtplib
- #from email.mime.multipart import MIMEMultipart ---> you can uncomment insted of message = MIMEText(bodytext, "plain", 'utf-8') but i think its okay
- from email.mime.text import MIMEText
- try:
- server = smtplib.SMTP(smtp_server, port)
- server.starttls()
- login_address ="Enter the sender's email address"
- login_password ="Enter password"
- server.login(login_address, login_password)
- #message = MIMEMultipart()
- ts = round(datetime.datetime.now(pytz.timezone('Europe/Moscow')).timestamp()) # set the timezone as you wish for your location
- bodytext=" got the data at " + ts + " " + descrip1 + " : " + str(temp_data1) + " " + descrip2 + " : " + str(temp_data2)
- message = MIMEText(bodytext, "plain", 'utf-8')
- message['Subject']="Email Subject"
- message['From']="your eamil address"
- message['To']="recipient email address"
- #text = MIMEText(bodytext)
- #message.attach(text)
- server.send_message(message)
- server.quit()
- except requests.exceptions.RequestException as err:
- print ("SMTP connect error : ",err)
- # ------------- Ciphers / Cryptography -----------------
- # AES_GCM
- def aes_gcm_encrypt(key,iv,text):
- cipher = AES.new(key, AES.MODE_GCM, nonce=iv)
- ciphertext, mac = cipher.encrypt_and_digest(text)
- return ciphertext, mac
- def aes_gcm_decrypt(key,iv,ciphertext,mac):
- plaintext = 0
- cipher = AES.new(key, AES.MODE_GCM, nonce=iv)
- try:
- plaintext = cipher.decrypt_and_verify(ciphertext,mac)
- except (ValueError, KeyError):
- print("Incorrect decryption")
- return plaintext
- # CHACHA - POLY1305
- def chacha20_poly1305_encrypt(key,byte_data):
- from Crypto.Cipher import ChaCha20_Poly1305 as cha
- chacha20 = cha.new(key=key)
- cipher_text, mac = chacha20.encrypt_and_digest(byte_data)
- nonce = chacha20.nonce
- print('cipher_text: ', cipher_text)
- print('mac: ', mac)
- return cipher_text,nonce
- def chacha20_poly1305_decrypt(key,nonce,mac,cipher_text):
- from Crypto.Cipher import ChaCha20_Poly1305 as cha
- decrypt_data="no data"
- try:
- chacha_poly = cha.new(key=key, nonce=nonce)
- decrypt_data = chacha_poly.decrypt_and_verify(cipher_text, mac)
- print('The message is: ', decrypt_data)
- except:
- print("The message couldn't be verified")
- return decrypt_data
- # RSA_cryptography (Rivest-Shamir-Adleman)
- def RSA_encrypt(message, ):
- from Crypto.Cipher import PKCS1_OAEP
- from Crypto.PublicKey import RSA
- from binascii import hexlify
- #Generating private key (RsaKey object) of key length of 1024 bits
- private_key = RSA.generate(1024)
- #Generating the public key (RsaKey object) from the private key
- public_key = private_key.publickey()
- print(type(private_key), type(public_key))
- #Converting the RsaKey objects to string
- private_pem = private_key.export_key().decode()
- public_pem = public_key.export_key().decode()
- print(type(private_pem), type(public_pem))
- #Writing down the private and public keys to 'pem' files
- with open('private_pem.pem', 'w') as pr:
- pr.write(private_pem)
- with open('public_pem.pem', 'w') as pu:
- pu.write(public_pem)
- #Importing keys from files, converting it into the RsaKey object
- pr_key = RSA.import_key(open('private_pem.pem', 'r').read())
- pu_key = RSA.import_key(open('public_pem.pem', 'r').read())
- print(type(pr_key), type(pu_key))
- #Instantiating PKCS1_OAEP object with the public key for encryption
- cipher = PKCS1_OAEP.new(key=pu_key)
- #Encrypting the message with the PKCS1_OAEP object
- cipher_text = cipher.encrypt(message.encode('UTF-8'))
- print(cipher_text)
- def RSA_decrypt(cipher_text,pr_key):
- from Crypto.Cipher import PKCS1_OAEP
- #Instantiating PKCS1_OAEP object with the private key for decryption
- decrypt = PKCS1_OAEP.new(key=pr_key)
- #Decrypting the message with the PKCS1_OAEP object
- decrypted_message = decrypt.decrypt(cipher_text)
- print(decrypted_message)
- # common packers
- #
- # messagepack
- def msgPackPack(descrip1,temp_data1,descrip2,temp_data2):
- import msgpack
- json_obj = {}
- json_obj['sensor1_desc'] = descrip1
- json_obj['sensor1_temp'] = temp_data1
- json_obj['sensor2_desc'] = descrip2
- json_obj['sensor2_temp'] = temp_data2
- json_obj['timestamp'] = round(timestamp = datetime.datetime.now(pytz.timezone('Europe/Moscow')).timestamp())
- msgpack_data = msgpack.packb(json.dumps(json_obj, ensure_ascii=False))
- return msgpack_data
- def msgPackUnPack(msgpck_str):
- import msgpack
- msgs=[]
- for msg in msgpack.Unpacker(msgpck_str):
- print msg
- msgs.append(msg)
- return msgs
- # cbor
- def cborPack(descrip1,temp_data1,descrip2,temp_data2):
- import cbor
- json_obj = {}
- json_obj['sensor1_desc'] = descrip1
- json_obj['sensor1_temp'] = temp_data1
- json_obj['sensor2_desc'] = descrip2
- json_obj['sensor2_temp'] = temp_data2
- json_obj['timestamp'] = round(timestamp = datetime.datetime.now(pytz.timezone('Europe/Moscow')).timestamp())
- cbor_data = cbor.load(json.dumps(json_obj, ensure_ascii=False))
- return cbor_data
- def cborUnPack(cbor_str):
- import cbor
- msgs=[]
- for msg in cbor.dump(cbor_str):
- print msg
- msgs.append(msg)
- return msgs
- # SSL TLS Client connection to your own SSL server which echos the response back in reply
- def sendTLSClient(descrip1,temp_data1,descrip2,temp_data2):
- import socket, ssl
- # SSL with TLS
- context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) # refer to the options https://www.pyopenssl.org/en/21.0.0/api/ssl.html
- context.options |= ssl.OP_NO_TLSv1_3 # OP_NO_TLSv1_3 = Disable TLS1.3 PROTOCOL_TLS_CLIENT = tls client, PROTOCOL_TLS_SERVER = TLS Server
- #context.options |= ssl.OP_SINGLE_ECDH_USE example a new key will always be created when using ephemeral (Elliptic curve) Diffie-Hellman.
- context.check_hostname = False # do not check hostname
- context.load_verify_locations('cert.pem') # specify the certificate
- context.set_ciphers('kRSA') # specify the cipher suite
- # Create a socket and wrap it in an SSL socket
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- ssock = context.wrap_socket(sock)
- # Connect to the server and send data
- ssock.connect((SSLHOST,SSLPORT))
- ts = round(datetime.datetime.now(pytz.timezone('Europe/Moscow')).timestamp()) # set the timezone as you wish for your location
- msgdata = " got the data at " + ts + " " + descrip1 + " : " + str(temp_data1) + " " + descrip2 + " : " + str(temp_data2)
- ssock.sendall(bytes(msgdata + "\n", "utf-8"))
- # Receive data from the server and exit
- received = str(ssock.recv(1024), "utf-8")
- ssock.close()
- print("Sent: " + msgdata)
- print("Received: " + received)
- # SSL TLS Client connection to your own SSL server which echos the response back in reply
- def sendSSL23Client(descrip1,temp_data1,descrip2,temp_data2):
- import ssl
- import socket
- import pprint
- context = ssl.create_default_context()
- context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
- context.verify_mode = ssl.CERT_NONE
- context.check_hostname = False
- conn = context.wrap_socket(socket.socket(socket.AF_INET), server_hostname=URL)
- conn.connect((SSL_URL, SSL_PORT))
- cert = conn.getpeercert()
- pprint.pprint(cert)
- ts = round(datetime.datetime.now(pytz.timezone('Europe/Moscow')).timestamp()) # set the timezone as you wish for your location
- msgdata = b" got the data at " + str(ts).encode('UTF-8') + b" " + descrip1.encode('UTF-8') + b" : " + str(temp_data1).encode('UTF-8') + b" " + descrip2.encode('UTF-8') + b" : " + str(temp_data2).encode('UTF-8') + b"\r\n\r\n"
- #conn.sendall(b"HEAD / HTTP/1.0\r\nHost: linuxfr.org\r\n\r\n")
- if AES_ENCRYPT == 1:
- from Crypto.Cipher import AES
- from Crypto.Random import get_random_bytes
- key = get_random_bytes(16)
- nonce = get_random_bytes(12)
- ciphertext, mac = aes_gcm_encrypt(key,nonce,msgdata)
- conn.sendall(ciphertext)
- indata = conn.recv(1024).split(b"\r\n"))
- pprint.pprint(aes_gcm_decrypt(key,nonce,indata,mac))
- else:
- conn.sendall(msgdata)
- indat = conn.recv(1024).split(b"\r\n"))
- pprint.pprint(indat)
- conn.close()
- # Choose the iOt you want to use according to the define in top section
- if telem_opt == "soracom":
- sendData=sendDataSoraCom
- elif telem_opt == "beebotte":
- client = mqtt.Client()
- client.username_pw_set("token:%s"%TOKEN)
- client.on_connect = on_connect
- client.on_disconnect = on_disconnect
- client.on_publish = on_publish
- client.tls_set(CACERT)
- client.connect(HOSTNAME, port=BPORT, keepalive=KEEP_ALIVE)
- sendData=sendDataBeebotte
- GTOPIC=BTOPIC
- elif telem_opt == "ubidots":
- from ubidots import ApiClient
- api = ApiClient(token="4b00-xxxxxxxxxxxxxxxxxxx", base_url="http://yourcompanyname.api.ubidots.com/api/v1.6/")
- sendData=sendDataUbiDots
- elif telem_opt == "machinist":
- sendData=sendDataMachinist
- elif telem_opt == "aws":
- import boto3
- sendData=sendDataAws
- elif telem_opt == "yandex":
- # un comment if you didnt already make the table
- # makeTableQuery=makeYandexTable("pxc_cloud_db.timeseries_example")
- # sendCreateYandexTable(makeTableQuery)
- sendData=sendDataYandex
- elif telem_opt == "azure":
- if AZURE_TYPE == "connection_string" :
- from azure.iot.device.aio import IoTHubDeviceClient
- from azure.iot.device import Message
- device_client = IoTHubDeviceClient.create_from_connection_string(AZURE_CONN_STR)
- else:
- from azure.iot.device.aio import ProvisioningDeviceClient
- from azure.iot.device.aio import IoTHubDeviceClient
- from azure.iot.device import Message
- provisioning_device_client = ProvisioningDeviceClient.create_from_symmetric_key(
- provisioning_host=provisioning_host,
- registration_id=registration_id,
- id_scope=id_scope,
- symmetric_key=symmetric_key,
- )
- registration_result = provisioning_device_client.register()
- print("The complete registration result is")
- print(registration_result.registration_state)
- if registration_result.status == "assigned":
- print("Your device has been provisioned. It will now begin sending telemetry.")
- device_client = IoTHubDeviceClient.create_from_symmetric_key(
- symmetric_key=symmetric_key,
- hostname=registration_result.registration_state.assigned_hub,
- device_id=registration_result.registration_state.device_id,
- )
- device_client.connect()
- sendData=sendDataAzure
- elif telem_opt == "twillio":
- sendData=sendTwilioSMS
- elif telem_opt == "smtp_email":
- sendData=sendSMTPemail
- elif telem_opt == "ssl_tls_server":
- sendData=sendTLSClient
- elif telem_opt == "ssl_23_server":
- sendData=sendSSL23Client
- elif telem_opt == "gcs_blob":
- sendData=uploadBlobGcs
- elif telem_opt = "cloud_mqtt":
- client = mqtt.Client(protocol=mqtt.MQTTv311)
- client.tls_set(CCACERT)
- client.username_pw_set("aircampro", "air987")
- client.on_connect = on_connect
- client.on_disconnect = on_disconnect
- client.on_publish = on_publish
- client.connect(CHOSTNAME, CPORT, KEEP_ALIVE)
- sendData=sendDataCloudMqtt
- GTOPIC=CTOPIC
- elif telem_opt == "splunk":
- sendData=sendDataSplunk
- else: # we assume its for mosquito broker internally running on host e.g. raspberry pi 4
- client = mqtt.Client()
- client.on_connect = on_connect
- client.on_disconnect = on_disconnect
- client.on_publish = on_publish
- client.connect("localhost", MQTT_PORT, KEEP_ALIVE)
- sendData=sendDataMosquito
- GTOPIC=MTOPIC
- # read the enOcean sensor
- sensor1_rdy = False
- sensor2_rdy = False
- cnt = 0
- dataLen = 0
- optLen = 0
- telegraph,headList,dataList,optList = [],[],[],[]
- ready = True
- while True:
- if sensor1_rdy and sensor2_rdy:
- break
- # 1byte Reads data from the serial port one by one.
- s_data = s_port.read().encode('hex') # read 1byte
- # Sync-Recognizes the start of Telegram data from Byte 0x55.。
- if s_data == '55' and ready: # Telegram start
- # valuable reset
- cnt = 0
- dataLen = 0
- optLen = 0
- ready = False
- telegraph,headList,dataList,optList = [],[],[],[]
- cnt += 1
- telegraph.append(s_data)
- # We are analyzing Telegram data.
- if 2 <= cnt <= 5: # header
- headList.append(s_data)
- if cnt == 5: # header end, get data length
- dataLen = int(headList[1],16)
- optLen = int(headList[2],16)
- if 7 <= cnt <= (6+dataLen): # data
- dataList.append(s_data)
- if (7+dataLen) <= cnt <= (6+dataLen+optLen): # optional data
- optList.append(s_data)
- if cnt == (6+dataLen+optLen+1): # Telegram end
- ready = True
- sensorId = ':'.join(dataList[1:5]) # Sensor ID
- timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- # The data is displayed for debugging purposes.
- print "========"
- print timestamp
- print("telegraph...:", ":".join(telegraph))
- print( "head...", ":".join(headList))
- print( "data...", ":".join(dataList), "(length=%d)" % dataLen)
- print( "opt ...", ":".join(optList), "(length=%d)" % optLen)
- print( "sensorID...", sensorId)
- # Temp sensor1 Send data when the sensor ID matches.
- if sensorId == SENSORID1 and sensor1_rdy == False:
- val = int(dataList[7],16)
- sensor1_temp = round((255.0-val)/255.0*40.0, 2)
- sensor_desc1 = 'temperature01 ' + SENSORID1 + " : "
- sensor1_rdy = True
- # Temp sensor1 Send data when the sensor ID matches.
- elif sensorId == SENSORID2 and sensor2_rdy == False:
- val = int(dataList[7],16)
- sensor2_temp = round((255.0-val)/255.0*40.0, 2)
- sensor_desc1 = 'temperature02 ' + SENSORID2 + " : "
- sensor2_rdy = True
- # Other sensors, ignore ID
- else:
- continue
- # return this data for webpage display via flask->uwgsi->nginx
- if sensor1_rdy and sensor2_rdy:
- sendData(sensor_desc1, sensor1_temp, sensor_desc2, sensor2_temp)
- return str(sensor1_temp),str(sensor2_temp)
- if __name__ == '__main__':
- # Open serial port for communication with enOcean sensors
- try:
- s_port = serial.Serial(PORT, BAUD_RT)
- print("open serial port: %s" % PORT)
- except:
- print("cannot open serial port: %s" % PORT)
- exit(1)
- t1,t2 = Enocean2Telemetry(s_port,MY_CURRENT_TELEM)
- print("sensorID..." % SENSORID1)
- print("temperature" % t1)
- print("sensorID..." % SENSORID2)
- print("temperature" % t2)
Add Comment
Please, Sign In to add comment