File "Praktikum4.py"
Full Path: /var/www/html/main/lecture-notes/Internet of Things/02 Materi Praktikum - IoT - PDF/File Python/Praktikum4.py
File size: 2.46 KB
MIME-type: text/x-script.python
Charset: utf-8
# The MIT License (MIT)
# Copyright (c) 2019 Mike Teachman
# https://opensource.org/licenses/MIT
#
# Example MicroPython code showing how to use the MQTT protocol to
# publish data to a Thingspeak channel
#
# User configuration parameters are indicated with "ENTER_".
from machine import Pin
import network
from umqtt.robust import MQTTClient
import time
import gc
import sys
p0 = Pin(2, Pin.OUT)
# Blink Pendek 50ms
def blinkPendek():
p0.value(0)
time.sleep_ms(100)
p0.value(1)
time.sleep_ms(100)
# Configuration parameters
#=======================================================
WIFI_SSID = 'SSID'
WIFI_PASSWORD = 'Password'
THINGSPEAK_MQTT_CLIENT_ID = b"EQILMjo7KQgVHTYFOwQ2GCU"
THINGSPEAK_MQTT_USERNAME = b"EQILMjo7KQgVHTYFOwQ2GCU"
THINGSPEAK_MQTT_PASSWORD = b"PxJsWN/nuQobedz69C7CZpup"
THINGSPEAK_CHANNEL_ID = b'1907480'
#=======================================================
# turn off the WiFi Access Point
ap_if = network.WLAN(network.AP_IF)
ap_if.active(False)
# connect the device to the WiFi network
wifi = network.WLAN(network.STA_IF)
wifi.active(True)
wifi.connect(WIFI_SSID, WIFI_PASSWORD)
# wait until the device is connected to the WiFi network
MAX_ATTEMPTS = 20
attempt_count = 0
while not wifi.isconnected() and attempt_count < MAX_ATTEMPTS:
attempt_count += 1
time.sleep(1)
if attempt_count == MAX_ATTEMPTS:
print('could not connect to the WiFi network')
sys.exit()
THINGSPEAK_MQTT_USERNAME = THINGSPEAK_MQTT_CLIENT_ID
client = MQTTClient(server=b"mqtt3.thingspeak.com",
client_id=THINGSPEAK_MQTT_CLIENT_ID,
user=THINGSPEAK_MQTT_USERNAME,
password=THINGSPEAK_MQTT_PASSWORD,
ssl=False)
try:
client.connect()
except Exception as e:
print('could not connect to MQTT server {}{}'.format(type(e).__name__, e))
sys.exit()
# continually publish two fields to a Thingspeak channel using MQTT
PUBLISH_PERIOD_IN_SEC = 5
while True:
try:
freeHeapInBytes = gc.mem_free()
credentials = bytes("channels/{:s}/publish".format(THINGSPEAK_CHANNEL_ID), 'utf-8')
payload = bytes("field1={:.1f}&field2={:.1f}\n".format(freeHeapInBytes, 10.2), 'utf-8')
client.publish(credentials, payload)
blinkPendek()
time.sleep(PUBLISH_PERIOD_IN_SEC)
except KeyboardInterrupt:
print('Ctrl-C pressed...exiting')
client.disconnect()
wifi.disconnect()
break