Skip to content
Permalink
c40547151e
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
104 lines (86 sloc) 5.25 KB
# Import necessary modules from the cryptography library
from cryptography.hazmat.primitives import hashes # Hash functions for key derivation
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC # Key derivation function
from cryptography.hazmat.backends import default_backend # Default backend for cryptography
from cryptography.hazmat.primitives import serialization # Functions for serializing cryptographic objects
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes # Symmetric encryption primitives
import os # Operating system dependent functionality (used for random bytes generation)
# Define a function to generate a key using PBKDF2 key derivation
def generate_key(password, salt, algorithm='AES', key_size=32):
# Create a PBKDF2HMAC key derivation function object
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(), # Specify the hash algorithm (SHA-256)
length=key_size, # Specify the length of the derived key
salt=salt, # Specify the salt value
iterations=100000, # Specify the number of iterations
backend=default_backend() # Specify the backend for the KDF
)
# Derive a key from the password and return it
key = kdf.derive(password.encode()) # Derive key from password
return key
# Define a function to encrypt a message using AES encryption
def encrypt_message(message, key, algorithm='AES'):
if algorithm == 'AES': # Check if the encryption algorithm is AES
iv = os.urandom(16) # Generate a random initialization vector (IV)
cipher = Cipher(algorithms.AES(key), modes.CFB(iv), backend=default_backend()) # Create an AES cipher object
encryptor = cipher.encryptor() # Create an encryptor object
ct = encryptor.update(message.encode()) + encryptor.finalize() # Encrypt the message
return iv + ct # Return the IV concatenated with the ciphertext
else:
raise ValueError("Unsupported encryption algorithm") # Raise an error for unsupported algorithms
# Define a function to decrypt an encrypted message using AES decryption
def decrypt_message(encrypted_message, key, algorithm='AES'):
if algorithm == 'AES': # Check if the encryption algorithm is AES
iv = encrypted_message[:16] # Extract the IV from the encrypted message
ct = encrypted_message[16:] # Extract the ciphertext from the encrypted message
cipher = Cipher(algorithms.AES(key), modes.CFB(iv), backend=default_backend()) # Create an AES cipher object
decryptor = cipher.decryptor() # Create a decryptor object
decrypted_message = decryptor.update(ct) + decryptor.finalize() # Decrypt the ciphertext
return decrypted_message.decode() # Return the decrypted message as a string
else:
raise ValueError("Unsupported encryption algorithm") # Raise an error for unsupported algorithms
# Main block of the code
if __name__ == "__main__":
password = "MySecretPassword" # Set the password
salt = os.urandom(16) # Generate a random salt value
key = generate_key(password, salt) # Generate a key using the password and salt
print("Generated key:", key) # Print the generated key
message = "Hello, how are you?." # Set the message to be encrypted
encrypted_message = encrypt_message(message, key) # Encrypt the message using the generated key
print("Encrypted message:", encrypted_message) # Print the encrypted message
decrypted_message = decrypt_message(encrypted_message, key) # Decrypt the encrypted message using the key
print("Decrypted message:", decrypted_message) # Print the decrypted message
//////////////////////
# TEST CODE (NOT USED IN MQTT PROJECT)
# Import the necessary MQTT client library
import paho.mqtt.client as mqtt
import base64 # Import base64 library for encoding
from Crypto.Cipher import AES # Import AES encryption algorithm
from Crypto.Random import get_random_bytes # Import function to generate random bytes
# Configuration
BROKER_ADDRESS = "mqtt.eclipse.org" # Example MQTT broker address
# Function to generate a random key and initialization vector (IV)
def generate_key_and_iv():
key = get_random_bytes(32) # Generate a 32-byte random key
iv = get_random_bytes(16) # Generate a 16-byte random IV
return key, iv
# Callback function executed when the client connects to the MQTT broker
def on_connect(client, userdata, flags, rc):
# Publish a message to the "encrypted_topic" topic when connected
client.publish("encrypted_topic", encode_data_for_transmission(b"Hello, MQTT!"))
# Function to encode data for transmission
def encode_data_for_transmission(data):
key, iv = generate_key_and_iv() # Generate a new key and IV
cipher = AES.new(key, AES.MODE_CFB, iv) # Create an AES cipher object with CFB mode
ciphertext = cipher.encrypt(data) # Encrypt the data
# Encode the IV and ciphertext in base64 and return as a UTF-8 string
return base64.b64encode(iv + ciphertext).decode("utf-8")
# Create an MQTT client instance
client = mqtt.Client()
# Connect the client to the MQTT broker
client.connect(BROKER_ADDRESS, 1883)
# Set the on_connect callback function
client.on_connect = on_connect
# Start the network loop to handle communication asynchronously
client.loop_start()
/////////////////////