File "Praktikum4.py"

Full Path: /var/www/html/main/lecture-notes/Internet of Things/02 Materi Praktikum - IoT - PDF/File Python/Praktikum4.py
File size: 2.46 KB
MIME-type: text/x-script.python
Charset: utf-8

 
Open Back
# The MIT License (MIT)
# Copyright (c) 2019 Mike Teachman
# https://opensource.org/licenses/MIT
# 
# Example MicroPython code showing how to use the MQTT protocol to  
# publish data to a Thingspeak channel
#
# User configuration parameters are indicated with "ENTER_".  

from machine import Pin
import network
from umqtt.robust import MQTTClient
import time
import gc
import sys

p0 = Pin(2, Pin.OUT)

# Blink Pendek 50ms
def blinkPendek():
    p0.value(0)        
    time.sleep_ms(100)
    p0.value(1)
    time.sleep_ms(100)

# Configuration parameters
#=======================================================
WIFI_SSID = 'SSID'
WIFI_PASSWORD = 'Password'
THINGSPEAK_MQTT_CLIENT_ID = b"EQILMjo7KQgVHTYFOwQ2GCU"
THINGSPEAK_MQTT_USERNAME = b"EQILMjo7KQgVHTYFOwQ2GCU"
THINGSPEAK_MQTT_PASSWORD = b"PxJsWN/nuQobedz69C7CZpup"
THINGSPEAK_CHANNEL_ID = b'1907480'
#=======================================================

# turn off the WiFi Access Point
ap_if = network.WLAN(network.AP_IF)
ap_if.active(False)

# connect the device to the WiFi network
wifi = network.WLAN(network.STA_IF)
wifi.active(True)
wifi.connect(WIFI_SSID, WIFI_PASSWORD)

# wait until the device is connected to the WiFi network
MAX_ATTEMPTS = 20
attempt_count = 0
while not wifi.isconnected() and attempt_count < MAX_ATTEMPTS:
    attempt_count += 1
    time.sleep(1)

if attempt_count == MAX_ATTEMPTS:
    print('could not connect to the WiFi network')
    sys.exit()
  
THINGSPEAK_MQTT_USERNAME = THINGSPEAK_MQTT_CLIENT_ID

client = MQTTClient(server=b"mqtt3.thingspeak.com",
                    client_id=THINGSPEAK_MQTT_CLIENT_ID, 
                    user=THINGSPEAK_MQTT_USERNAME, 
                    password=THINGSPEAK_MQTT_PASSWORD, 
                    ssl=False)
                    
try:            
    client.connect()
except Exception as e:
    print('could not connect to MQTT server {}{}'.format(type(e).__name__, e))
    sys.exit()

# continually publish two fields to a Thingspeak channel using MQTT
PUBLISH_PERIOD_IN_SEC = 5
while True:
    try:
        freeHeapInBytes = gc.mem_free()
        credentials = bytes("channels/{:s}/publish".format(THINGSPEAK_CHANNEL_ID), 'utf-8')  
        payload = bytes("field1={:.1f}&field2={:.1f}\n".format(freeHeapInBytes, 10.2), 'utf-8')
        client.publish(credentials, payload)
        blinkPendek()
        time.sleep(PUBLISH_PERIOD_IN_SEC)
    except KeyboardInterrupt:
        print('Ctrl-C pressed...exiting')
        client.disconnect()
        wifi.disconnect()
        break