[python] Simple Python 2.7 script to control Worx Landroid via Domoticz
Viewer
*** This page was generated with the meta tag "noindex, nofollow". This happened because you selected this option before saving or the system detected it as spam. This means that this page will never get into the search engines and the search bot will not crawl it. There is nothing to worry about, you can still share it with anyone.
- ##########################
- # This is running stand-alone with username and password as argument, listening port can be specified as optional third argument:
- # Published for https://gathering.tweakers.net/forum/list_messages/1645152
- ##########################
- import requests
- import uuid
- import paho.mqtt.client as mqtt
- import base64
- from OpenSSL import crypto
- import ssl
- import web
- import sys
- import json
- CLIENT_SECRET = 'nCH3A0WvMYn66vGorjSrnGZ2YtjQWDiCvjg7jNxK'
- class WorxLandroid(object):
- UUID = str(uuid.uuid1())
- def __init__(self):
- self._token = ''
- self._mqtt_client_id = None
- self._mqtt_client_cert = None
- self._mqtt_endpoint = None
- self._mower_macs = []
- self._client = None
- self._last_status = {}
- self._subscriptions = {}
- def _api_request(self, method, url, json=None):
- headers = {
- "Authorization": self._token,
- }
- r = method('https://api.worxlandroid.com/api/v2/' + url, json=json, headers=headers)
- return r.json()
- def _connected(self, client, userdata, flags, rc):
- print '[CONNECTED]'
- for mac in self._mower_macs:
- self._client.subscribe('DB510/%s/commandOut' % mac)
- self.poll()
- def _message_received(self, client, userdata, message):
- print message.payload
- data = json.loads(message.payload)
- self._last_status[data['dat']['mac']] = data
- for url, (mac, path) in self._subscriptions.iteritems():
- if mac == data['dat']['mac']:
- try:
- path = "['%s']%s" % (mac, path)
- print 'self._last_status' + path
- val = json.dumps(eval('self._last_status' + path))
- print url % (val,)
- requests.get(url % (val,))
- print 'OK'
- except Exception:
- import traceback
- traceback.print_exc()
- print 'NOK'
- def _publish(self, mac=None, payload=None):
- print payload
- if mac is None:
- for mac in self._mower_macs:
- self._client.publish('DB510/%s/commandIn' % mac, payload)
- else:
- self._client.publish('DB510/%s/commandIn' % mac, payload)
- def login(self, email, password):
- json = {
- "username": email,
- "password": password,
- "grant_type": "password",
- "client_id": 1,
- "uuid": self.UUID,
- "type": "app",
- "scope": "*",
- "client_secret": CLIENT_SECRET
- }
- response_data = self._api_request(requests.post, 'oauth/token', json)
- self._token = response_data['token_type'] + ' ' + response_data['access_token']
- response_data = self._api_request(requests.get, 'users/me')
- self._mqtt_endpoint = response_data['mqtt_endpoint']
- self._mqtt_client_id = "android-" + self.UUID
- print 'Token: ', self._token
- print 'Endpoint: ', self._mqtt_endpoint
- print 'Client ID:', self._mqtt_client_id
- response_data = self._api_request(requests.get, 'product-items')
- self._mower_macs = [data['mac_address'] for data in response_data]
- for index, mac in enumerate(self._mower_macs):
- print 'Mower %d: %s' % (index, mac)
- response_data = self._api_request(requests.get, 'users/certificate')
- self._mqtt_client_cert = response_data['pkcs12']
- with open('Client.pem', 'w') as file:
- p12 = crypto.load_pkcs12(base64.b64decode(self._mqtt_client_cert))
- file.write(crypto.dump_certificate(crypto.FILETYPE_PEM, p12.get_certificate()))
- file.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, p12.get_privatekey()))
- def connect(self):
- self._client = mqtt.Client(client_id=self._mqtt_client_id)
- self._client.tls_set(certfile='Client.pem', tls_version=ssl.PROTOCOL_TLSv1_2)
- self._client.on_connect = self._connected
- self._client.on_message = self._message_received
- self._client.connect(self._mqtt_endpoint, 8883)
- self._client.loop_start()
- def disconnect(self):
- self._client.loop_stop()
- # Get the last status (of all robots if no MAC is specified)
- # Doesn't actively ask the robot for an update
- def get_status(self, mac=None):
- if mac is None:
- return json.dumps(self._last_status)
- else:
- return json.dumps(self._last_status[mac])
- # Instruct the robot to send an update
- def poll(self, mac=None):
- self._publish(mac, '{"sc":{"m":1}}')
- # Instruct the robot to start mowing
- def start(self, mac=None):
- self._publish(mac, '{"cmd":1}')
- # Instruct the robot to go home
- def go_home(self, mac=None):
- self._publish(mac, '{"cmd":3}')
- # Set the mow time percentage (-100 - 100)
- def set_percentage(self, pct, mac=None):
- self._publish(mac, '{"sc":{"p":%s}}' % pct)
- # Set the rain delay
- def set_rain_delay(self, delay, mac=None):
- self._publish(mac, '{"rd":%s}' % delay)
- # Update the schedule of a particular day
- def set_schedule(self, day, start, end, edge, mac):
- schedules = self._last_status[mac]['cfg']['sc']['d']
- schedules[int(day)] = [str(start), int(end), int(edge)]
- print json.dumps(schedules)
- self._publish(mac, '{"sc":{"d":%s}}' % json.dumps(schedules))
- # Update the starting distance for a given zone index
- def set_zone_distance(self, zone, distance, mac):
- distances = self._last_status[mac]['cfg']['mz']
- distances[int(zone)] = int(distance)
- self._publish(mac, '{"mz":%s}' % json.dumps(distances))
- # Keep on mowing the same zone (until next change)
- def force_zone(self, zone, mac=None):
- self._publish(mac, '{"mzv":%s}' % json.dumps([int(zone)]*10))
- # Subscribe to parts of the data using webhooks
- def subscribe(self, url, mac, path):
- self._subscriptions[url] = (mac, path)
- print self._subscriptions
- def unsubscribe(self, url):
- if url in self._subscriptions:
- del self._subscriptions[url]
- class WebApp(web.application):
- def run(self, port=8080, *middleware):
- func = self.wsgifunc(*middleware)
- return web.httpserver.runsimple(func, ('0.0.0.0', port))
- class get_status(object):
- def GET(self):
- data = web.input(mac=None)
- return landroid.get_status(data.mac)
- class poll(object):
- def GET(self):
- data = web.input(mac=None)
- landroid.poll(data.mac)
- return 'OK'
- class start(object):
- def GET(self):
- data = web.input(mac=None)
- landroid.start(data.mac)
- return 'OK'
- class go_home(object):
- def GET(self):
- data = web.input(mac=None)
- landroid.go_home(data.mac)
- return 'OK'
- class set_percentage(object):
- def GET(self):
- data = web.input(mac=None, pct=0)
- landroid.set_percentage(data.pct, data.mac)
- return 'OK'
- class set_rain_delay(object):
- def GET(self):
- data = web.input(mac=None, delay=60)
- landroid.set_rain_delay(data.delay, data.mac)
- return 'OK'
- class set_schedule(object):
- def GET(self):
- data = web.input(day=0, start="00:00", end=1439, edge=0)
- landroid.set_schedule(data.day, data.start, data.end, data.edge, data.mac)
- return 'OK'
- class set_zone_distance(object):
- def GET(self):
- data = web.input()
- landroid.set_zone_distance(data.zone, data.distance, data.mac)
- return 'OK'
- class force_zone(object):
- def GET(self):
- data = web.input(mac=None)
- landroid.force_zone(data.zone, data.mac)
- return 'OK'
- class subscribe(object):
- def GET(self):
- data = web.input(path='')
- landroid.subscribe(data.url, data.mac, data.path)
- return 'OK'
- landroid = WorxLandroid()
- def main(argv):
- urls = (
- '/get_status', 'get_status',
- '/poll', 'poll',
- '/start', 'start',
- '/go_home', 'go_home',
- '/set_percentage', 'set_percentage',
- '/set_rain_delay', 'set_rain_delay',
- '/set_schedule', 'set_schedule',
- '/set_zone_distance', 'set_zone_distance',
- '/force_zone', 'force_zone',
- '/subscribe', 'subscribe',
- )
- app = WebApp(urls, globals())
- landroid.login(argv[1], argv[2])
- landroid.connect()
- app.run(int(argv[3]) if len(argv) == 4 else 8080)
- landroid.disconnect()
- if __name__ == "__main__":
- main(sys.argv)
- ##########################
- # This is the code used in Domoticz (Python script):
- ##########################
- from urllib import parse
- LANDROID_URL = 'http://127.0.0.1:8088/'
- LANDROID_MAC = '123456ABCDEF'
- LANDROID_CMD = LANDROID_URL + '%s' + '?mac=' + LANDROID_MAC
- DOMOTICZ_BASE_URL = 'http://127.0.0.1:8080/'
- def sub_device(idx, path):
- url = DOMOTICZ_BASE_URL + 'json.htm?type=command¶m=udevice&idx=%d&nvalue=0&svalue=%%s' % idx
- log(subprocess.run(['curl', LANDROID_CMD % 'subscribe' + '&url=%s&path=%s' % (parse.quote(url), parse.quote(path))]))
- def subscribe():
- log('Subscribing to Landroid updates')
- sub_device(147, "['dat']['bt']['p']") # Landroid battery percentage
- sub_device(148, "['dat']['bt']['v']") # Landroid battery voltage
- sub_device(149, "['dat']['bt']['t']") # Landroid battery temperature
- sub_device(155, "['dat']['bt']['c']") # Landroid raw charging bit
- sub_device(150, "['dat']['ls']") # Landroid raw status byte
- sub_device(151, "['dat']['le']") # Landroid raw error byte
- sub_device(154, "['cfg']['sc']['p']/2 + 50") # Landroid active percentage (0-100%)
- sub_device(166, "['dat']['st']['d']") # Landroid distance
- sub_device(167, "['dat']['st']['wt']") # Landroid work time
Editor
You can edit this paste and save as new: