[python] Simple Python 2.7 script to control Worx Landroid via Domoticz

Viewer

copydownloadembedprintName: Simple Python 2.7 script to control Worx Landroid via Domoticz
  1.  
  2. ##########################
  3. # This is running stand-alone with username and password as argument, listening port can be specified as optional third argument:
  4. # Published for https://gathering.tweakers.net/forum/list_messages/1645152
  5. ##########################
  6.  
  7. import requests
  8. import uuid
  9. import paho.mqtt.client as mqtt
  10. import base64
  11. from OpenSSL import crypto
  12. import ssl
  13. import web
  14. import sys
  15. import json
  16.  
  17. CLIENT_SECRET = 'nCH3A0WvMYn66vGorjSrnGZ2YtjQWDiCvjg7jNxK'
  18.  
  19.  
  20. class WorxLandroid(object):
  21.     UUID = str(uuid.uuid1())
  22.  
  23.     def __init__(self):
  24.         self._token = ''
  25.  
  26.         self._mqtt_client_id = None
  27.         self._mqtt_client_cert = None
  28.         self._mqtt_endpoint = None
  29.         self._mower_macs = []
  30.  
  31.         self._client = None
  32.         self._last_status = {}
  33.         self._subscriptions = {}
  34.  
  35.     def _api_request(self, method, url, json=None):
  36.         headers = {
  37.             "Authorization"self._token,
  38.         }
  39.  
  40.         r = method('https://api.worxlandroid.com/api/v2/' + url, json=json, headers=headers)
  41.         return r.json()
  42.  
  43.     def _connected(self, client, userdata, flags, rc):
  44.         print '[CONNECTED]'
  45.         for mac in self._mower_macs:
  46.             self._client.subscribe('DB510/%s/commandOut' % mac)
  47.         self.poll()
  48.  
  49.     def _message_received(self, client, userdata, message):
  50.         print message.payload
  51.         data = json.loads(message.payload)
  52.         self._last_status[data['dat']['mac']] = data
  53.  
  54.         for url, (mac, path) in self._subscriptions.iteritems():
  55.             if mac == data['dat']['mac']:
  56.                 try:
  57.                     path = "['%s']%s" % (mac, path)
  58.                     print 'self._last_status' + path
  59.                     val = json.dumps(eval('self._last_status' + path))
  60.                     print url % (val,)
  61.                     requests.get(url % (val,))
  62.                     print 'OK'
  63.                 except Exception:
  64.                     import traceback
  65.                     traceback.print_exc()
  66.                     print 'NOK'
  67.  
  68.     def _publish(self, mac=None, payload=None):
  69.         print payload
  70.         if mac is None:
  71.             for mac in self._mower_macs:
  72.                 self._client.publish('DB510/%s/commandIn' % mac, payload)
  73.         else:
  74.             self._client.publish('DB510/%s/commandIn' % mac, payload)
  75.  
  76.     def login(self, email, password):
  77.         json = {
  78.             "username"email,
  79.             "password": password,
  80.             "grant_type""password",
  81.             "client_id"1,
  82.             "uuid":     self.UUID,
  83.             "type":     "app",
  84.             "scope":    "*",
  85.             "client_secret": CLIENT_SECRET
  86.         }
  87.         response_data = self._api_request(requests.post, 'oauth/token', json)
  88.         self._token = response_data['token_type'] + ' ' + response_data['access_token']
  89.         
  90.         response_data = self._api_request(requests.get, 'users/me')
  91.         self._mqtt_endpoint = response_data['mqtt_endpoint']
  92.         
  93.         self._mqtt_client_id = "android-" + self.UUID
  94.         
  95.         print 'Token:    ', self._token
  96.         print 'Endpoint: ', self._mqtt_endpoint
  97.         print 'Client ID:', self._mqtt_client_id
  98.  
  99.         response_data = self._api_request(requests.get, 'product-items')
  100.         self._mower_macs = [data['mac_address'] for data in response_data]
  101.         for index, mac in enumerate(self._mower_macs):
  102.             print 'Mower %d:   %s' % (index, mac)
  103.  
  104.         response_data = self._api_request(requests.get, 'users/certificate')
  105.         self._mqtt_client_cert = response_data['pkcs12']
  106.  
  107.         with open('Client.pem', 'w') as file:
  108.             p12 = crypto.load_pkcs12(base64.b64decode(self._mqtt_client_cert))
  109.             file.write(crypto.dump_certificate(crypto.FILETYPE_PEM, p12.get_certificate()))
  110.             file.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, p12.get_privatekey()))
  111.  
  112.     def connect(self):
  113.         self._client = mqtt.Client(client_id=self._mqtt_client_id)
  114.         self._client.tls_set(certfile='Client.pem', tls_version=ssl.PROTOCOL_TLSv1_2)
  115.         self._client.on_connect = self._connected
  116.         self._client.on_message = self._message_received
  117.         self._client.connect(self._mqtt_endpoint, 8883)
  118.         self._client.loop_start()
  119.  
  120.     def disconnect(self):
  121.         self._client.loop_stop()
  122.  
  123.     # Get the last status (of all robots if no MAC is specified)
  124.     # Doesn't actively ask the robot for an update
  125.     def get_status(self, mac=None):
  126.         if mac is None:
  127.             return json.dumps(self._last_status)
  128.         else:
  129.             return json.dumps(self._last_status[mac])
  130.  
  131.     # Instruct the robot to send an update
  132.     def poll(self, mac=None):
  133.         self._publish(mac, '{"sc":{"m":1}}')
  134.  
  135.     # Instruct the robot to start mowing
  136.     def start(self, mac=None):
  137.         self._publish(mac, '{"cmd":1}')
  138.  
  139.     # Instruct the robot to go home
  140.     def go_home(self, mac=None):
  141.         self._publish(mac, '{"cmd":3}')
  142.  
  143.     # Set the mow time percentage (-100 - 100)
  144.     def set_percentage(self, pct, mac=None):
  145.         self._publish(mac, '{"sc":{"p":%s}}' % pct)
  146.  
  147.     # Set the rain delay
  148.     def set_rain_delay(self, delay, mac=None):
  149.         self._publish(mac, '{"rd":%s}' % delay)
  150.  
  151.     # Update the schedule of a particular day
  152.     def set_schedule(self, day, start, end, edge, mac):
  153.         schedules = self._last_status[mac]['cfg']['sc']['d']
  154.         schedules[int(day)] = [str(start), int(end), int(edge)]
  155.         print json.dumps(schedules)
  156.         self._publish(mac, '{"sc":{"d":%s}}' % json.dumps(schedules))
  157.  
  158.     # Update the starting distance for a given zone index
  159.     def set_zone_distance(self, zone, distance, mac):
  160.         distances = self._last_status[mac]['cfg']['mz']
  161.         distances[int(zone)] = int(distance)
  162.         self._publish(mac, '{"mz":%s}' % json.dumps(distances))
  163.  
  164.     # Keep on mowing the same zone (until next change)
  165.     def force_zone(self, zone, mac=None):
  166.         self._publish(mac, '{"mzv":%s}' % json.dumps([int(zone)]*10))
  167.  
  168.     # Subscribe to parts of the data using webhooks
  169.     def subscribe(self, url, mac, path):
  170.         self._subscriptions[url] = (mac, path)
  171.         print self._subscriptions
  172.  
  173.     def unsubscribe(self, url):
  174.         if url in self._subscriptions:
  175.             del self._subscriptions[url]
  176.  
  177.  
  178.  
  179. class WebApp(web.application):
  180.     def run(self, port=8080, *middleware):
  181.         func = self.wsgifunc(*middleware)
  182.         return web.httpserver.runsimple(func, ('0.0.0.0', port))
  183.  
  184.  
  185. class get_status(object):
  186.     def GET(self):
  187.         data = web.input(mac=None)
  188.         return landroid.get_status(data.mac)
  189.  
  190.  
  191. class poll(object):
  192.     def GET(self):
  193.         data = web.input(mac=None)
  194.         landroid.poll(data.mac)
  195.         return 'OK'
  196.  
  197.  
  198. class start(object):
  199.     def GET(self):
  200.         data = web.input(mac=None)
  201.         landroid.start(data.mac)
  202.         return 'OK'
  203.  
  204.  
  205. class go_home(object):
  206.     def GET(self):
  207.         data = web.input(mac=None)
  208.         landroid.go_home(data.mac)
  209.         return 'OK'
  210.  
  211.  
  212. class set_percentage(object):
  213.     def GET(self):
  214.         data = web.input(mac=None, pct=0)
  215.         landroid.set_percentage(data.pct, data.mac)
  216.         return 'OK'
  217.  
  218.  
  219. class set_rain_delay(object):
  220.     def GET(self):
  221.         data = web.input(mac=None, delay=60)
  222.         landroid.set_rain_delay(data.delay, data.mac)
  223.         return 'OK'
  224.  
  225. class set_schedule(object):
  226.     def GET(self):
  227.         data = web.input(day=0, start="00:00", end=1439, edge=0)
  228.         landroid.set_schedule(data.day, data.start, data.end, data.edge, data.mac)
  229.         return 'OK'
  230.  
  231. class set_zone_distance(object):
  232.     def GET(self):
  233.         data = web.input()
  234.         landroid.set_zone_distance(data.zone, data.distance, data.mac)
  235.         return 'OK'
  236.  
  237.  
  238. class force_zone(object):
  239.     def GET(self):
  240.         data = web.input(mac=None)
  241.         landroid.force_zone(data.zone, data.mac)
  242.         return 'OK'
  243.  
  244.  
  245. class subscribe(object):
  246.     def GET(self):
  247.         data = web.input(path='')
  248.         landroid.subscribe(data.url, data.mac, data.path)
  249.         return 'OK'
  250.  
  251.  
  252. landroid = WorxLandroid()
  253.  
  254.  
  255. def main(argv):
  256.     urls = (
  257.         '/get_status', 'get_status',
  258.         '/poll', 'poll',
  259.         '/start', 'start',
  260.         '/go_home', 'go_home',
  261.         '/set_percentage', 'set_percentage',
  262.         '/set_rain_delay', 'set_rain_delay',
  263.         '/set_schedule', 'set_schedule',
  264.         '/set_zone_distance', 'set_zone_distance',
  265.         '/force_zone', 'force_zone',
  266.         '/subscribe', 'subscribe',
  267.     )
  268.     app = WebApp(urls, globals())
  269.  
  270.     landroid.login(argv[1], argv[2])
  271.     landroid.connect()
  272.     app.run(int(argv[3]) if len(argv) == 4 else 8080)
  273.     landroid.disconnect()
  274.  
  275.  
  276. if __name__ == "__main__":
  277.     main(sys.argv)
  278.  
  279.  
  280.  
  281. ##########################
  282. # This is the code used in Domoticz (Python script):
  283. ##########################
  284. from urllib import parse
  285.  
  286. LANDROID_URL = 'http://127.0.0.1:8088/'
  287. LANDROID_MAC = '123456ABCDEF'
  288. LANDROID_CMD = LANDROID_URL + '%s' + '?mac=' + LANDROID_MAC
  289. DOMOTICZ_BASE_URL = 'http://127.0.0.1:8080/'
  290.  
  291. def sub_device(idx, path):
  292.     url = DOMOTICZ_BASE_URL + 'json.htm?type=command&param=udevice&idx=%d&nvalue=0&svalue=%%s' % idx
  293.     log(subprocess.run(['curl', LANDROID_CMD % 'subscribe' + '&url=%s&path=%s' % (parse.quote(url), parse.quote(path))]))
  294.  
  295.  
  296. def subscribe():
  297.     log('Subscribing to Landroid updates')
  298.     sub_device(147, "['dat']['bt']['p']") # Landroid battery percentage
  299.     sub_device(148, "['dat']['bt']['v']") # Landroid battery voltage  
  300.     sub_device(149, "['dat']['bt']['t']") # Landroid battery temperature
  301.     sub_device(155, "['dat']['bt']['c']") # Landroid raw charging bit
  302.     sub_device(150, "['dat']['ls']")      # Landroid raw status   byte
  303.     sub_device(151, "['dat']['le']")      # Landroid raw error byte
  304.     sub_device(154, "['cfg']['sc']['p']/2 + 50") # Landroid active percentage (0-100%)
  305.     sub_device(166, "['dat']['st']['d']")  # Landroid distance
  306.     sub_device(167, "['dat']['st']['wt']") # Landroid work time
  307.     

Editor

You can edit this paste and save as new:


File Description
  • Simple Python 2.7 script to control Worx Landroid via Domoticz
  • Paste Code
  • 11 May-2021
  • 9.98 Kb
You can Share it: