Examples

Sending and receiving data

Objectives

With this example you will learn how to receive and send data from your network.

Code

Setting sink config

Objectives

With this example you will learn how to configure a sink remotely from your backend. It is useful especially when the sink was not customized when gateway was produced.

Note

The script has hardcoded values that should be customized before being used.

Code

# Copyright 2021 Wirepas Ltd. All Rights Reserved.
#
# See file LICENSE.txt for full license details.
#
import argparse
from wirepas_mqtt_library import WirepasNetworkInterface
import wirepas_mesh_messaging as wmm
import logging

# Hardcoded config as an example
new_config = {
    "node_role": 17, # Sink with Low Latency flag: 0x11
    "network_address": 0x1AB2C3,
    "network_channel": 13,
    # Fake keys to illustrate the concept
    "cipher_key": bytes([0x11,0x22,0x33,0x44,0x55,0x66,0x77,0x88,0x11,0x22,0x33,0x44,0x55,0x66,0x77,0x88]),
    "authentication_key": bytes([0x11,0x22,0x33,0x44,0x55,0x66,0x77,0x88,0x11,0x22,0x33,0x44,0x55,0x66,0x77,0x88]),
    "started": True
}

if __name__ == "__main__":
    """Configure a given sink on a gateway to predefined settings
    """
    parser = argparse.ArgumentParser(fromfile_prefix_chars='@')
    # Mqtt parameters
    parser.add_argument('--host',
                        help="MQTT broker address")
    parser.add_argument('--port', default=8883,
                        type=int,
                        help="MQTT broker port")
    parser.add_argument('--username',
                        help="MQTT broker username")
    parser.add_argument('--password',
                        help="MQTT broker password")
    parser.add_argument('--insecure',
                        dest='insecure',
                        action='store_true',
                        help="MQTT use unsecured connection")

    parser.add_argument('--gateway',
                        required=True,
                        help="Gateway the sink to configure is attached")
    
    parser.add_argument('--sink',
                        required=True,
                        help="the sink to configure")
    
    parser.add_argument('--sink_address',
                        type=int,
                        help="the sink address. Other sink settings are hardcoded inside script")
    args = parser.parse_args()

    logging.basicConfig(format='%(levelname)s %(asctime)s %(message)s', level=logging.INFO)

    wni = WirepasNetworkInterface(args.host,
                                  args.port,
                                  args.username,
                                  args.password,
                                  insecure=args.insecure)

    found = False
    for gw, sink, config in wni.get_sinks():
        if gw != args.gateway:
            continue

        if sink != args.sink:
            continue

        # Sink is founded
        found = True
        print("Sink %s:%s is found" % (args.gateway, args.sink))

        if args.sink_address is not None:
            new_config["node_address"] = args.sink_address
            
        try:
            res = wni.set_sink_config(gw, sink, new_config)
            if res != wmm.GatewayResultCode.GW_RES_OK:
                print("Cannot set new config to %s:%s res=%s" % (gw, sink, res))
        except TimeoutError:
            print("Cannot set new config to %s:%s" % (gw, sink))

    if not found:
        print("Cannot find the sink to configure %s:%s" % (args.gateway, args.sink))
    else:
        print("Sink %s:%s is configured" % (args.gateway, args.sink))

Discovering nodes

Objectives

With this example you will learn how to discover the list of your nodes from your network.

In this version, it is a passive discovery, so only nodes producing data will be discovered. The script will run for a specified period and print the list of nodes found or saved to a file.

Code

# Copyright 2021 Wirepas Ltd. All Rights Reserved.
#
# See file LICENSE.txt for full license details.
#
import argparse
from wirepas_mqtt_library import WirepasNetworkInterface
from time import sleep, strftime
import os

import logging

if __name__ == "__main__":
    parser = argparse.ArgumentParser(fromfile_prefix_chars='@')
    parser.add_argument('--host',
                        help="MQTT broker address")
    parser.add_argument('--port', default=8883,
                        type=int,
                        help="MQTT broker port")
    parser.add_argument('--username', default='mqttmasteruser',
                        help="MQTT broker username")
    parser.add_argument('--password',
                        help="MQTT broker password")
    parser.add_argument('--insecure',
                        dest='insecure',
                        action='store_true',
                        help="MQTT use unsecured connection")

    parser.add_argument('--network',
                        type=int,
                        help="Network address concerned by scratchpad")

    parser.add_argument('--delay_s',
                        type=int,
                        help="Delay in seconds to listen for network traffic and discover nodes")

    parser.add_argument('--output_folder',
                        help="Folder to store file containing list of nodes at the end, "
                             "otherwise list is printed on standard output")

    args = parser.parse_args()

    logging.basicConfig(format='%(levelname)s %(asctime)s %(message)s', level=logging.INFO)

    wni = WirepasNetworkInterface(args.host,
                                  args.port,
                                  args.username,
                                  args.password,
                                  insecure=args.insecure)

    if args.network is None:
        print("No network address provided")
        exit()

    if args.delay_s is None:
        print("No delay given to listen the network")
        exit()

    nodes = set()

    def on_data_rx(data):
        nodes.add(data.source_address)

    wni.register_data_cb(on_data_rx, network=args.network)

    print("Waiting for %d s to discover nodes" % args.delay_s)
    sleep(args.delay_s)

    print("In %d s, %d nodes were found in network 0x%x" % (args.delay_s, nodes.__len__(), args.network))

    if args.output_folder is not None:
        file = os.path.join(args.output_folder, "list_nodes_" + strftime("%Y%m%d-%H%M%S"))
        with open(file, "w") as f:
            for node in nodes:
                f.write("%d\n" % node)
        print("Result written to %s" % file)
    else:
        print(nodes)

Making an otap

Objectives

With this example you will learn how to do an otap in your network (if all nodes are running at list 5.1 version of the Wirepas stack).

The code can be adapted depending on your use case to avoid the command parameter.

Code

# Copyright 2021 Wirepas Ltd. All Rights Reserved.
#
# See file LICENSE.txt for full license details.
#
import argparse
from wirepas_mqtt_library import WirepasNetworkInterface, WirepasOtapHelper
import wirepas_mesh_messaging as wmm
import logging
from random import choice

if __name__ == "__main__":
    parser = argparse.ArgumentParser(fromfile_prefix_chars='@')
    # Mqtt parameters
    parser.add_argument('--host',
                        help="MQTT broker address")
    parser.add_argument('--port', default=8883,
                        type=int,
                        help="MQTT broker port")
    parser.add_argument('--username',
                        help="MQTT broker username")
    parser.add_argument('--password',
                        help="MQTT broker password")
    parser.add_argument('--insecure',
                        dest='insecure',
                        action='store_true',
                        help="MQTT use unsecured connection")

    # Otap parameters
    parser.add_argument("cmd", choices=["sink_only",
                                        "propagate_only",
                                        "immediately",
                                        "delayed",
                                        "update_delay",
                                        "no_otap"])

    parser.add_argument('--network',
                        type=int,
                        help="Network address concerned by scratchpad")
    parser.add_argument('--file',
                        help="Scratcphad to use")
    parser.add_argument("--gateway",
                        type=str,
                        nargs='+',
                        help="Gateway list to use. If specified, the OTAP will be performed on given Gateway IDs",
                        default=None)
    args = parser.parse_args()

    logging.basicConfig(format='%(levelname)s %(asctime)s %(message)s', level=logging.WARNING)

    wni = WirepasNetworkInterface(args.host,
                                  args.port,
                                  args.username,
                                  args.password,
                                  insecure=args.insecure)

    if args.network is None:
        print("No network address provided")
        exit()

    otapHelper = WirepasOtapHelper(wni,
                                   args.network,
                                   args.gateway)

    if args.cmd == "update_delay":
        delay = wmm.ProcessingDelay.DELAY_THIRTY_MINUTES
        print("Set new delay to %s" % delay)
        if not otapHelper.set_propagate_and_process_scratchpad_to_all_sinks(delay=delay):
            print("Cannot update delay")
        exit()

    if args.cmd == "no_otap":
        print("Set target to no otap")
        if not otapHelper.set_no_otap_to_all_sinks():
            print("Cannot set no otap on all sinks")
        exit()

    # Optional: find a "good" sequence
    current_target_seq_set = otapHelper.get_target_scratchpad_seq_list()

    print("Sequences already in used: ", current_target_seq_set)
    # Take a sequence from 1-254 that is not in the current set
    seq = choice([i for i in range(1,254) if i not in current_target_seq_set])

    print("Sequence chosen: ", seq)

    if not otapHelper.load_scratchpad_to_all_sinks(args.file, seq):
        print("Cannot load scratchpad to all sinks")

    if args.cmd == "sink_only":
        print("Processing scratchpad on all sinks")
        if not otapHelper.process_scratchpad_on_all_sinks():
            print("Cannot process scratchpad on all sinks")
    elif args.cmd == "propagate_only":
        print("Set propagate only")
        if not otapHelper.set_propagate_scratchpad_to_all_sinks():
            print("Cannot set propagate and process")
    elif args.cmd == "immediately":
        print("Set propagate and process")
        if not otapHelper.set_propagate_and_process_scratchpad_to_all_sinks():
            print("Cannot set propagate and process only")
    elif args.cmd == "delayed":
        print("Set propagate and process with 5 days delay")
        if not otapHelper.set_propagate_and_process_scratchpad_to_all_sinks(delay=wmm.ProcessingDelay.DELAY_FIVE_DAYS):
            print("Cannot set propagate and process only for 5 days")