| #!/usr/bin/python3 |
| # |
| # Example nfcpy to wpa_supplicant wrapper for DPP NFC operations |
| # Copyright (c) 2012-2013, Jouni Malinen <j@w1.fi> |
| # Copyright (c) 2019-2020, The Linux Foundation |
| # |
| # This software may be distributed under the terms of the BSD license. |
| # See README for more details. |
| |
| import binascii |
| import errno |
| import os |
| import struct |
| import sys |
| import time |
| import threading |
| import argparse |
| |
| import nfc |
| import ndef |
| |
| import logging |
| |
| scriptsdir = os.path.dirname(os.path.realpath(sys.modules[__name__].__file__)) |
| sys.path.append(os.path.join(scriptsdir, '..', '..', 'wpaspy')) |
| import wpaspy |
| |
| wpas_ctrl = '/var/run/wpa_supplicant' |
| ifname = None |
| init_on_touch = False |
| in_raw_mode = False |
| prev_tcgetattr = 0 |
| no_input = False |
| continue_loop = True |
| terminate_now = False |
| summary_file = None |
| success_file = None |
| netrole = None |
| operation_success = False |
| mutex = threading.Lock() |
| |
| C_NORMAL = '\033[0m' |
| C_RED = '\033[91m' |
| C_GREEN = '\033[92m' |
| C_YELLOW = '\033[93m' |
| C_BLUE = '\033[94m' |
| C_MAGENTA = '\033[95m' |
| C_CYAN = '\033[96m' |
| |
| def summary(txt, color=None): |
| with mutex: |
| if color: |
| print(color + txt + C_NORMAL) |
| else: |
| print(txt) |
| if summary_file: |
| with open(summary_file, 'a') as f: |
| f.write(txt + "\n") |
| |
| def success_report(txt): |
| summary(txt) |
| if success_file: |
| with open(success_file, 'a') as f: |
| f.write(txt + "\n") |
| |
| def wpas_connect(): |
| ifaces = [] |
| if os.path.isdir(wpas_ctrl): |
| try: |
| ifaces = [os.path.join(wpas_ctrl, i) for i in os.listdir(wpas_ctrl)] |
| except OSError as error: |
| summary("Could not find wpa_supplicant: %s", str(error)) |
| return None |
| |
| if len(ifaces) < 1: |
| summary("No wpa_supplicant control interface found") |
| return None |
| |
| for ctrl in ifaces: |
| if ifname and ifname not in ctrl: |
| continue |
| if os.path.basename(ctrl).startswith("p2p-dev-"): |
| # skip P2P management interface |
| continue |
| try: |
| summary("Trying to use control interface " + ctrl) |
| wpas = wpaspy.Ctrl(ctrl) |
| return wpas |
| except Exception as e: |
| pass |
| summary("Could not connect to wpa_supplicant") |
| return None |
| |
| def dpp_nfc_uri_process(uri): |
| wpas = wpas_connect() |
| if wpas is None: |
| return False |
| peer_id = wpas.request("DPP_NFC_URI " + uri) |
| if "FAIL" in peer_id: |
| summary("Could not parse DPP URI from NFC URI record", color=C_RED) |
| return False |
| peer_id = int(peer_id) |
| summary("peer_id=%d for URI from NFC Tag: %s" % (peer_id, uri)) |
| cmd = "DPP_AUTH_INIT peer=%d" % peer_id |
| global enrollee_only, configurator_only, config_params |
| if enrollee_only: |
| cmd += " role=enrollee" |
| elif configurator_only: |
| cmd += " role=configurator" |
| if config_params: |
| cmd += " " + config_params |
| summary("Initiate DPP authentication: " + cmd) |
| res = wpas.request(cmd) |
| if "OK" not in res: |
| summary("Failed to initiate DPP Authentication", color=C_RED) |
| return False |
| summary("DPP Authentication initiated") |
| return True |
| |
| def dpp_hs_tag_read(record): |
| wpas = wpas_connect() |
| if wpas is None: |
| return False |
| summary(record) |
| if len(record.data) < 5: |
| summary("Too short DPP HS", color=C_RED) |
| return False |
| if record.data[0] != 0: |
| summary("Unexpected URI Identifier Code", color=C_RED) |
| return False |
| uribuf = record.data[1:] |
| try: |
| uri = uribuf.decode() |
| except: |
| summary("Invalid URI payload", color=C_RED) |
| return False |
| summary("URI: " + uri) |
| if not uri.startswith("DPP:"): |
| summary("Not a DPP URI", color=C_RED) |
| return False |
| return dpp_nfc_uri_process(uri) |
| |
| def get_status(wpas, extra=None): |
| if extra: |
| extra = "-" + extra |
| else: |
| extra = "" |
| res = wpas.request("STATUS" + extra) |
| lines = res.splitlines() |
| vals = dict() |
| for l in lines: |
| try: |
| [name, value] = l.split('=', 1) |
| except ValueError: |
| summary("Ignore unexpected status line: %s" % l) |
| continue |
| vals[name] = value |
| return vals |
| |
| def get_status_field(wpas, field, extra=None): |
| vals = get_status(wpas, extra) |
| if field in vals: |
| return vals[field] |
| return None |
| |
| def own_addr(wpas): |
| addr = get_status_field(wpas, "address") |
| if addr is None: |
| addr = get_status_field(wpas, "bssid[0]") |
| return addr |
| |
| def dpp_bootstrap_gen(wpas, type="qrcode", chan=None, mac=None, info=None, |
| curve=None, key=None): |
| cmd = "DPP_BOOTSTRAP_GEN type=" + type |
| if chan: |
| cmd += " chan=" + chan |
| if mac: |
| if mac is True: |
| mac = own_addr(wpas) |
| if mac is None: |
| summary("Could not determine local MAC address for bootstrap info") |
| else: |
| cmd += " mac=" + mac.replace(':', '') |
| if info: |
| cmd += " info=" + info |
| if curve: |
| cmd += " curve=" + curve |
| if key: |
| cmd += " key=" + key |
| res = wpas.request(cmd) |
| if "FAIL" in res: |
| raise Exception("Failed to generate bootstrapping info") |
| return int(res) |
| |
| def dpp_start_listen(wpas, freq): |
| if get_status_field(wpas, "bssid[0]"): |
| summary("Own AP freq: %s MHz" % str(get_status_field(wpas, "freq"))) |
| if get_status_field(wpas, "beacon_set", extra="DRIVER") is None: |
| summary("Enable beaconing to have radio ready for RX") |
| wpas.request("DISABLE") |
| wpas.request("SET start_disabled 0") |
| wpas.request("ENABLE") |
| cmd = "DPP_LISTEN %d" % freq |
| global enrollee_only |
| global configurator_only |
| if enrollee_only: |
| cmd += " role=enrollee" |
| elif configurator_only: |
| cmd += " role=configurator" |
| global netrole |
| if netrole: |
| cmd += " netrole=" + netrole |
| summary(cmd) |
| res = wpas.request(cmd) |
| if "OK" not in res: |
| summary("Failed to start DPP listen", color=C_RED) |
| return False |
| return True |
| |
| def wpas_get_nfc_uri(start_listen=True, pick_channel=False, chan_override=None): |
| listen_freq = 2412 |
| wpas = wpas_connect() |
| if wpas is None: |
| return None |
| global own_id, chanlist |
| if chan_override: |
| chan = chan_override |
| else: |
| chan = chanlist |
| if chan and chan.startswith("81/"): |
| listen_freq = int(chan[3:].split(',')[0]) * 5 + 2407 |
| if chan is None and get_status_field(wpas, "bssid[0]"): |
| freq = get_status_field(wpas, "freq") |
| if freq: |
| freq = int(freq) |
| if freq >= 2412 and freq <= 2462: |
| chan = "81/%d" % ((freq - 2407) / 5) |
| summary("Use current AP operating channel (%d MHz) as the URI channel list (%s)" % (freq, chan)) |
| listen_freq = freq |
| if chan is None and pick_channel: |
| chan = "81/6" |
| summary("Use channel 2437 MHz since no other preference provided") |
| listen_freq = 2437 |
| own_id = dpp_bootstrap_gen(wpas, type="nfc-uri", chan=chan, mac=True) |
| res = wpas.request("DPP_BOOTSTRAP_GET_URI %d" % own_id).rstrip() |
| if "FAIL" in res: |
| return None |
| if start_listen: |
| if not dpp_start_listen(wpas, listen_freq): |
| raise Exception("Failed to start listen operation on %d MHz" % listen_freq) |
| return res |
| |
| def wpas_report_handover_req(uri): |
| wpas = wpas_connect() |
| if wpas is None: |
| return None |
| global own_id |
| cmd = "DPP_NFC_HANDOVER_REQ own=%d uri=%s" % (own_id, uri) |
| return wpas.request(cmd) |
| |
| def wpas_report_handover_sel(uri): |
| wpas = wpas_connect() |
| if wpas is None: |
| return None |
| global own_id |
| cmd = "DPP_NFC_HANDOVER_SEL own=%d uri=%s" % (own_id, uri) |
| return wpas.request(cmd) |
| |
| def dpp_handover_client(handover, alt=False): |
| summary("About to start run_dpp_handover_client (alt=%s)" % str(alt)) |
| if alt: |
| handover.i_m_selector = False |
| run_dpp_handover_client(handover, alt) |
| summary("Done run_dpp_handover_client (alt=%s)" % str(alt)) |
| |
| def run_client_alt(handover, alt): |
| if handover.start_client_alt and not alt: |
| handover.start_client_alt = False |
| summary("Try to send alternative handover request") |
| dpp_handover_client(handover, alt=True) |
| |
| class HandoverClient(nfc.handover.HandoverClient): |
| def __init__(self, handover, llc): |
| super(HandoverClient, self).__init__(llc) |
| self.handover = handover |
| |
| def recv_records(self, timeout=None): |
| msg = self.recv_octets(timeout) |
| if msg is None: |
| return None |
| records = list(ndef.message_decoder(msg, 'relax')) |
| if records and records[0].type == 'urn:nfc:wkt:Hs': |
| summary("Handover client received message '{0}'".format(records[0].type)) |
| return list(ndef.message_decoder(msg, 'relax')) |
| summary("Handover client received invalid message: %s" + binascii.hexlify(msg)) |
| return None |
| |
| def recv_octets(self, timeout=None): |
| start = time.time() |
| msg = bytearray() |
| while True: |
| poll_timeout = 0.1 if timeout is None or timeout > 0.1 else timeout |
| if not self.socket.poll('recv', poll_timeout): |
| if timeout: |
| timeout -= time.time() - start |
| if timeout <= 0: |
| return None |
| start = time.time() |
| continue |
| try: |
| r = self.socket.recv() |
| if r is None: |
| return None |
| msg += r |
| except TypeError: |
| return b'' |
| try: |
| list(ndef.message_decoder(msg, 'strict', {})) |
| return bytes(msg) |
| except ndef.DecodeError: |
| if timeout: |
| timeout -= time.time() - start |
| if timeout <= 0: |
| return None |
| start = time.time() |
| continue |
| return None |
| |
| def run_dpp_handover_client(handover, alt=False): |
| chan_override = None |
| if alt: |
| chan_override = handover.altchanlist |
| handover.alt_proposal_used = True |
| global test_uri, test_alt_uri |
| if test_uri: |
| summary("TEST MODE: Using specified URI (alt=%s)" % str(alt)) |
| uri = test_alt_uri if alt else test_uri |
| else: |
| uri = wpas_get_nfc_uri(start_listen=False, chan_override=chan_override) |
| if uri is None: |
| summary("Cannot start handover client - no bootstrap URI available", |
| color=C_RED) |
| return |
| handover.my_uri = uri |
| uri = ndef.UriRecord(uri) |
| summary("NFC URI record for DPP: " + str(uri)) |
| carrier = ndef.Record('application/vnd.wfa.dpp', 'A', uri.data) |
| global test_crn |
| if test_crn: |
| prev, = struct.unpack('>H', test_crn) |
| summary("TEST MODE: Use specified crn %d" % prev) |
| crn = test_crn |
| test_crn = struct.pack('>H', prev + 0x10) |
| else: |
| crn = os.urandom(2) |
| hr = ndef.HandoverRequestRecord(version="1.4", crn=crn) |
| hr.add_alternative_carrier('active', carrier.name) |
| message = [hr, carrier] |
| summary("NFC Handover Request message for DPP: " + str(message)) |
| |
| if handover.peer_crn is not None and not alt: |
| summary("NFC handover request from peer was already received - do not send own[1]") |
| return |
| if handover.client: |
| summary("Use already started handover client") |
| client = handover.client |
| else: |
| summary("Start handover client") |
| client = HandoverClient(handover, handover.llc) |
| try: |
| summary("Trying to initiate NFC connection handover") |
| client.connect() |
| summary("Connected for handover") |
| except nfc.llcp.ConnectRefused: |
| summary("Handover connection refused") |
| client.close() |
| return |
| except Exception as e: |
| summary("Other exception: " + str(e)) |
| client.close() |
| return |
| handover.client = client |
| |
| if handover.peer_crn is not None and not alt: |
| summary("NFC handover request from peer was already received - do not send own[2] (except alt)") |
| run_client_alt(handover, alt) |
| return |
| |
| summary("Sending handover request") |
| |
| handover.my_crn_ready = True |
| |
| if not client.send_records(message): |
| handover.my_crn_ready = False |
| summary("Failed to send handover request", color=C_RED) |
| run_client_alt(handover, alt) |
| return |
| |
| handover.my_crn, = struct.unpack('>H', crn) |
| |
| summary("Receiving handover response") |
| try: |
| start = time.time() |
| message = client.recv_records(timeout=3.0) |
| end = time.time() |
| summary("Received {} record(s) in {} seconds".format(len(message) if message is not None else -1, end - start)) |
| except Exception as e: |
| # This is fine if we are the handover selector |
| if handover.hs_sent: |
| summary("Client receive failed as expected since I'm the handover server: %s" % str(e)) |
| elif handover.alt_proposal_used and not alt: |
| summary("Client received failed for initial proposal as expected since alternative proposal was also used: %s" % str(e)) |
| else: |
| summary("Client receive failed: %s" % str(e), color=C_RED) |
| message = None |
| if message is None: |
| if handover.hs_sent: |
| summary("No response received as expected since I'm the handover server") |
| elif handover.alt_proposal_used and not alt: |
| summary("No response received for initial proposal as expected since alternative proposal was also used") |
| elif handover.try_own and not alt: |
| summary("No response received for initial proposal as expected since alternative proposal will also be sent") |
| else: |
| summary("No response received", color=C_RED) |
| run_client_alt(handover, alt) |
| return |
| summary("Received message: " + str(message)) |
| if len(message) < 1 or \ |
| not isinstance(message[0], ndef.HandoverSelectRecord): |
| summary("Response was not Hs - received: " + message.type) |
| return |
| |
| summary("Received handover select message") |
| summary("alternative carriers: " + str(message[0].alternative_carriers)) |
| if handover.i_m_selector: |
| summary("Ignore the received select since I'm the handover selector") |
| run_client_alt(handover, alt) |
| return |
| |
| if handover.alt_proposal_used and not alt: |
| summary("Ignore received handover select for the initial proposal since alternative proposal was sent") |
| client.close() |
| return |
| |
| dpp_found = False |
| for carrier in message: |
| if isinstance(carrier, ndef.HandoverSelectRecord): |
| continue |
| summary("Remote carrier type: " + carrier.type) |
| if carrier.type == "application/vnd.wfa.dpp": |
| if len(carrier.data) == 0 or carrier.data[0] != 0: |
| summary("URI Identifier Code 'None' not seen", color=C_RED) |
| continue |
| summary("DPP carrier type match - send to wpa_supplicant") |
| dpp_found = True |
| uri = carrier.data[1:].decode("utf-8") |
| summary("DPP URI: " + uri) |
| handover.peer_uri = uri |
| if test_uri: |
| summary("TEST MODE: Fake processing") |
| break |
| res = wpas_report_handover_sel(uri) |
| if res is None or "FAIL" in res: |
| summary("DPP handover report rejected", color=C_RED) |
| break |
| |
| success_report("DPP handover reported successfully (initiator)") |
| summary("peer_id=" + res) |
| peer_id = int(res) |
| wpas = wpas_connect() |
| if wpas is None: |
| break |
| |
| global enrollee_only |
| global config_params |
| if enrollee_only: |
| extra = " role=enrollee" |
| elif config_params: |
| extra = " role=configurator " + config_params |
| else: |
| # TODO: Single Configurator instance |
| res = wpas.request("DPP_CONFIGURATOR_ADD") |
| if "FAIL" in res: |
| summary("Failed to initiate Configurator", color=C_RED) |
| break |
| conf_id = int(res) |
| extra = " conf=sta-dpp configurator=%d" % conf_id |
| global own_id |
| summary("Initiate DPP authentication") |
| cmd = "DPP_AUTH_INIT peer=%d own=%d" % (peer_id, own_id) |
| cmd += extra |
| res = wpas.request(cmd) |
| if "FAIL" in res: |
| summary("Failed to initiate DPP authentication", color=C_RED) |
| break |
| |
| if not dpp_found and handover.no_alt_proposal: |
| summary("DPP carrier not seen in response - do not allow alternative proposal anymore") |
| elif not dpp_found: |
| summary("DPP carrier not seen in response - allow peer to initiate a new handover with different parameters") |
| handover.alt_proposal = True |
| handover.my_crn_ready = False |
| handover.my_crn = None |
| handover.peer_crn = None |
| handover.hs_sent = False |
| summary("Returning from dpp_handover_client") |
| return |
| |
| summary("Remove peer") |
| handover.close() |
| summary("Done with handover") |
| global only_one |
| if only_one: |
| print("only_one -> stop loop") |
| global continue_loop |
| continue_loop = False |
| |
| global no_wait |
| if no_wait or only_one: |
| summary("Trying to exit..") |
| global terminate_now |
| terminate_now = True |
| |
| summary("Returning from dpp_handover_client") |
| |
| class HandoverServer(nfc.handover.HandoverServer): |
| def __init__(self, handover, llc): |
| super(HandoverServer, self).__init__(llc) |
| self.sent_carrier = None |
| self.ho_server_processing = False |
| self.success = False |
| self.llc = llc |
| self.handover = handover |
| |
| def serve(self, socket): |
| peer_sap = socket.getpeername() |
| summary("Serving handover client on remote sap {0}".format(peer_sap)) |
| send_miu = socket.getsockopt(nfc.llcp.SO_SNDMIU) |
| try: |
| while socket.poll("recv"): |
| req = bytearray() |
| while socket.poll("recv"): |
| r = socket.recv() |
| if r is None: |
| return None |
| summary("Received %d octets" % len(r)) |
| req += r |
| if len(req) == 0: |
| continue |
| try: |
| list(ndef.message_decoder(req, 'strict', {})) |
| except ndef.DecodeError: |
| continue |
| summary("Full message received") |
| resp = self._process_request_data(req) |
| if resp is None or len(resp) == 0: |
| summary("No handover select to send out - wait for a possible alternative handover request") |
| handover.alt_proposal = True |
| req = bytearray() |
| continue |
| |
| for offset in range(0, len(resp), send_miu): |
| if not socket.send(resp[offset:offset + send_miu]): |
| summary("Failed to send handover select - connection closed") |
| return |
| summary("Sent out full handover select") |
| if handover.terminate_on_hs_send_completion: |
| handover.delayed_exit() |
| |
| except nfc.llcp.Error as e: |
| global terminate_now |
| summary("HandoverServer exception: %s" % e, |
| color=None if e.errno == errno.EPIPE or terminate_now else C_RED) |
| finally: |
| socket.close() |
| summary("Handover serve thread exiting") |
| |
| def process_handover_request_message(self, records): |
| handover = self.handover |
| self.ho_server_processing = True |
| global in_raw_mode |
| was_in_raw_mode = in_raw_mode |
| clear_raw_mode() |
| if was_in_raw_mode: |
| print("\n") |
| summary("HandoverServer - request received: " + str(records)) |
| |
| for carrier in records: |
| if not isinstance(carrier, ndef.HandoverRequestRecord): |
| continue |
| if carrier.collision_resolution_number: |
| handover.peer_crn = carrier.collision_resolution_number |
| summary("peer_crn: %d" % handover.peer_crn) |
| |
| if handover.my_crn is None and handover.my_crn_ready: |
| summary("Still trying to send own handover request - wait a moment to see if that succeeds before checking crn values") |
| for i in range(10): |
| if handover.my_crn is not None: |
| break |
| time.sleep(0.01) |
| if handover.my_crn is not None: |
| summary("my_crn: %d" % handover.my_crn) |
| |
| if handover.my_crn is not None and handover.peer_crn is not None: |
| if handover.my_crn == handover.peer_crn: |
| summary("Same crn used - automatic collision resolution failed") |
| # TODO: Should generate a new Handover Request message |
| return '' |
| if ((handover.my_crn & 1) == (handover.peer_crn & 1) and \ |
| handover.my_crn > handover.peer_crn) or \ |
| ((handover.my_crn & 1) != (handover.peer_crn & 1) and \ |
| handover.my_crn < handover.peer_crn): |
| summary("I'm the Handover Selector Device") |
| handover.i_m_selector = True |
| else: |
| summary("Peer is the Handover Selector device") |
| summary("Ignore the received request.") |
| return '' |
| |
| hs = ndef.HandoverSelectRecord('1.4') |
| sel = [hs] |
| |
| found = False |
| |
| for carrier in records: |
| if isinstance(carrier, ndef.HandoverRequestRecord): |
| continue |
| summary("Remote carrier type: " + carrier.type) |
| if carrier.type == "application/vnd.wfa.dpp": |
| summary("DPP carrier type match - add DPP carrier record") |
| if len(carrier.data) == 0 or carrier.data[0] != 0: |
| summary("URI Identifier Code 'None' not seen", color=C_RED) |
| continue |
| uri = carrier.data[1:].decode("utf-8") |
| summary("Received DPP URI: " + uri) |
| |
| global test_uri, test_alt_uri |
| if test_uri: |
| summary("TEST MODE: Using specified URI") |
| data = test_sel_uri if test_sel_uri else test_uri |
| elif handover.alt_proposal and handover.altchanlist: |
| summary("Use alternative channel list while processing alternative proposal from peer") |
| data = wpas_get_nfc_uri(start_listen=False, |
| chan_override=handover.altchanlist, |
| pick_channel=True) |
| else: |
| data = wpas_get_nfc_uri(start_listen=False, |
| pick_channel=True) |
| summary("Own URI (pre-processing): %s" % data) |
| |
| if test_uri: |
| summary("TEST MODE: Fake processing") |
| res = "OK" |
| data += " [%s]" % uri |
| else: |
| res = wpas_report_handover_req(uri) |
| if res is None or "FAIL" in res: |
| summary("DPP handover request processing failed", |
| color=C_RED) |
| if handover.altchanlist: |
| data = wpas_get_nfc_uri(start_listen=False, |
| chan_override=handover.altchanlist) |
| summary("Own URI (try another channel list): %s" % data) |
| continue |
| |
| if test_alt_uri: |
| summary("TEST MODE: Reject initial proposal") |
| continue |
| |
| found = True |
| |
| if not test_uri: |
| wpas = wpas_connect() |
| if wpas is None: |
| continue |
| global own_id |
| data = wpas.request("DPP_BOOTSTRAP_GET_URI %d" % own_id).rstrip() |
| if "FAIL" in data: |
| continue |
| summary("Own URI (post-processing): %s" % data) |
| handover.my_uri = data |
| handover.peer_uri = uri |
| uri = ndef.UriRecord(data) |
| summary("Own bootstrapping NFC URI record: " + str(uri)) |
| |
| if not test_uri: |
| info = wpas.request("DPP_BOOTSTRAP_INFO %d" % own_id) |
| freq = None |
| for line in info.splitlines(): |
| if line.startswith("use_freq="): |
| freq = int(line.split('=')[1]) |
| if freq is None or freq == 0: |
| summary("No channel negotiated over NFC - use channel 6") |
| freq = 2437 |
| else: |
| summary("Negotiated channel: %d MHz" % freq) |
| if not dpp_start_listen(wpas, freq): |
| break |
| |
| carrier = ndef.Record('application/vnd.wfa.dpp', 'A', uri.data) |
| summary("Own DPP carrier record: " + str(carrier)) |
| hs.add_alternative_carrier('active', carrier.name) |
| sel = [hs, carrier] |
| break |
| |
| summary("Sending handover select: " + str(sel)) |
| if found: |
| summary("Handover completed successfully") |
| handover.terminate_on_hs_send_completion = True |
| self.success = True |
| handover.hs_sent = True |
| handover.i_m_selector = True |
| elif handover.no_alt_proposal: |
| summary("Do not try alternative proposal anymore - handover failed", |
| color=C_RED) |
| handover.hs_sent = True |
| else: |
| summary("Try to initiate with alternative parameters") |
| handover.try_own = True |
| handover.hs_sent = False |
| handover.no_alt_proposal = True |
| if handover.client_thread: |
| handover.start_client_alt = True |
| else: |
| handover.client_thread = threading.Thread(target=llcp_worker, |
| args=(self.llc, True)) |
| handover.client_thread.start() |
| return sel |
| |
| def clear_raw_mode(): |
| import sys, tty, termios |
| global prev_tcgetattr, in_raw_mode |
| if not in_raw_mode: |
| return |
| fd = sys.stdin.fileno() |
| termios.tcsetattr(fd, termios.TCSADRAIN, prev_tcgetattr) |
| in_raw_mode = False |
| |
| def getch(): |
| import sys, tty, termios, select |
| global prev_tcgetattr, in_raw_mode |
| fd = sys.stdin.fileno() |
| prev_tcgetattr = termios.tcgetattr(fd) |
| ch = None |
| try: |
| tty.setraw(fd) |
| in_raw_mode = True |
| [i, o, e] = select.select([fd], [], [], 0.05) |
| if i: |
| ch = sys.stdin.read(1) |
| finally: |
| termios.tcsetattr(fd, termios.TCSADRAIN, prev_tcgetattr) |
| in_raw_mode = False |
| return ch |
| |
| def dpp_tag_read(tag): |
| success = False |
| for record in tag.ndef.records: |
| summary(record) |
| summary("record type " + record.type) |
| if record.type == "application/vnd.wfa.dpp": |
| summary("DPP HS tag - send to wpa_supplicant") |
| success = dpp_hs_tag_read(record) |
| break |
| if isinstance(record, ndef.UriRecord): |
| summary("URI record: uri=" + record.uri) |
| summary("URI record: iri=" + record.iri) |
| if record.iri.startswith("DPP:"): |
| summary("DPP URI") |
| if not dpp_nfc_uri_process(record.iri): |
| break |
| success = True |
| else: |
| summary("Ignore unknown URI") |
| break |
| |
| if success: |
| success_report("Tag read succeeded") |
| |
| return success |
| |
| def rdwr_connected_write_tag(tag): |
| summary("Tag found - writing - " + str(tag)) |
| if not tag.ndef: |
| summary("Not a formatted NDEF tag", color=C_RED) |
| return |
| if not tag.ndef.is_writeable: |
| summary("Not a writable tag", color=C_RED) |
| return |
| global dpp_tag_data |
| if tag.ndef.capacity < len(dpp_tag_data): |
| summary("Not enough room for the message") |
| return |
| try: |
| tag.ndef.records = dpp_tag_data |
| except ValueError as e: |
| summary("Writing the tag failed: %s" % str(e), color=C_RED) |
| return |
| success_report("Tag write succeeded") |
| summary("Tag writing completed - remove tag", color=C_GREEN) |
| global only_one, operation_success |
| operation_success = True |
| if only_one: |
| global continue_loop |
| continue_loop = False |
| global dpp_sel_wait_remove |
| return dpp_sel_wait_remove |
| |
| def write_nfc_uri(clf, wait_remove=True): |
| summary("Write NFC URI record") |
| data = wpas_get_nfc_uri() |
| if data is None: |
| summary("Could not get NFC URI from wpa_supplicant", color=C_RED) |
| return |
| |
| global dpp_sel_wait_remove |
| dpp_sel_wait_remove = wait_remove |
| summary("URI: %s" % data) |
| uri = ndef.UriRecord(data) |
| summary(uri) |
| |
| summary("Touch an NFC tag to write URI record", color=C_CYAN) |
| global dpp_tag_data |
| dpp_tag_data = [uri] |
| clf.connect(rdwr={'on-connect': rdwr_connected_write_tag}) |
| |
| def write_nfc_hs(clf, wait_remove=True): |
| summary("Write NFC Handover Select record on a tag") |
| data = wpas_get_nfc_uri() |
| if data is None: |
| summary("Could not get NFC URI from wpa_supplicant", color=C_RED) |
| return |
| |
| global dpp_sel_wait_remove |
| dpp_sel_wait_remove = wait_remove |
| summary("URI: %s" % data) |
| uri = ndef.UriRecord(data) |
| summary(uri) |
| carrier = ndef.Record('application/vnd.wfa.dpp', 'A', uri.data) |
| hs = ndef.HandoverSelectRecord('1.4') |
| hs.add_alternative_carrier('active', carrier.name) |
| summary(hs) |
| summary(carrier) |
| |
| summary("Touch an NFC tag to write HS record", color=C_CYAN) |
| global dpp_tag_data |
| dpp_tag_data = [hs, carrier] |
| summary(dpp_tag_data) |
| clf.connect(rdwr={'on-connect': rdwr_connected_write_tag}) |
| |
| def rdwr_connected(tag): |
| global only_one, no_wait |
| summary("Tag connected: " + str(tag)) |
| |
| if tag.ndef: |
| summary("NDEF tag: " + tag.type) |
| summary(tag.ndef.records) |
| success = dpp_tag_read(tag) |
| if only_one and success: |
| global continue_loop |
| continue_loop = False |
| else: |
| summary("Not an NDEF tag - remove tag", color=C_RED) |
| return True |
| |
| return not no_wait |
| |
| def llcp_worker(llc, try_alt): |
| global handover |
| print("Start of llcp_worker()") |
| if try_alt: |
| summary("Starting handover client (try_alt)") |
| dpp_handover_client(handover, alt=True) |
| summary("Exiting llcp_worker thread (try_alt)") |
| return |
| global init_on_touch |
| if init_on_touch: |
| summary("Starting handover client (init_on_touch)") |
| dpp_handover_client(handover) |
| summary("llcp_worker init_on_touch processing completed: try_own={} hs_sent={} no_alt_proposal={} start_client_alt={}".format(handover.try_own, handover.hs_sent, handover.no_alt_proposal, handover.start_client_alt)) |
| if handover.start_client_alt and not handover.hs_sent: |
| summary("Try alternative handover request before exiting llcp_worker") |
| handover.start_client_alt = False |
| dpp_handover_client(handover, alt=True) |
| summary("Exiting llcp_worker thread (init_on_touch)") |
| return |
| |
| global no_input |
| if no_input: |
| summary("Wait for handover to complete") |
| else: |
| print("Wait for handover to complete - press 'i' to initiate") |
| while not handover.wait_connection and handover.srv.sent_carrier is None: |
| if handover.try_own: |
| handover.try_own = False |
| summary("Try to initiate another handover with own parameters") |
| handover.my_crn_ready = False |
| handover.my_crn = None |
| handover.peer_crn = None |
| handover.hs_sent = False |
| dpp_handover_client(handover, alt=True) |
| summary("Exiting llcp_worker thread (retry with own parameters)") |
| return |
| if handover.srv.ho_server_processing: |
| time.sleep(0.025) |
| elif no_input: |
| time.sleep(0.5) |
| else: |
| res = getch() |
| if res != 'i': |
| continue |
| clear_raw_mode() |
| summary("Starting handover client") |
| dpp_handover_client(handover) |
| summary("Exiting llcp_worker thread (manual init)") |
| return |
| |
| global in_raw_mode |
| was_in_raw_mode = in_raw_mode |
| clear_raw_mode() |
| if was_in_raw_mode: |
| print("\r") |
| summary("Exiting llcp_worker thread") |
| |
| class ConnectionHandover(): |
| def __init__(self): |
| self.client = None |
| self.client_thread = None |
| self.reset() |
| self.exit_thread = None |
| |
| def reset(self): |
| self.wait_connection = False |
| self.my_crn_ready = False |
| self.my_crn = None |
| self.peer_crn = None |
| self.hs_sent = False |
| self.no_alt_proposal = False |
| self.alt_proposal_used = False |
| self.i_m_selector = False |
| self.start_client_alt = False |
| self.terminate_on_hs_send_completion = False |
| self.try_own = False |
| self.my_uri = None |
| self.peer_uri = None |
| self.connected = False |
| self.alt_proposal = False |
| |
| def start_handover_server(self, llc): |
| summary("Start handover server") |
| self.llc = llc |
| self.srv = HandoverServer(self, llc) |
| |
| def close(self): |
| if self.client: |
| self.client.close() |
| self.client = None |
| |
| def run_delayed_exit(self): |
| summary("Trying to exit (delayed)..") |
| time.sleep(0.25) |
| summary("Trying to exit (after wait)..") |
| global terminate_now |
| terminate_now = True |
| |
| def delayed_exit(self): |
| global only_one |
| if only_one: |
| self.exit_thread = threading.Thread(target=self.run_delayed_exit) |
| self.exit_thread.start() |
| |
| def llcp_startup(llc): |
| global handover |
| handover.start_handover_server(llc) |
| return llc |
| |
| def llcp_connected(llc): |
| summary("P2P LLCP connected") |
| global handover |
| handover.connected = True |
| handover.srv.start() |
| if init_on_touch or not no_input: |
| handover.client_thread = threading.Thread(target=llcp_worker, |
| args=(llc, False)) |
| handover.client_thread.start() |
| return True |
| |
| def llcp_release(llc): |
| summary("LLCP release") |
| global handover |
| handover.close() |
| return True |
| |
| def terminate_loop(): |
| global terminate_now |
| return terminate_now |
| |
| def main(): |
| clf = nfc.ContactlessFrontend() |
| |
| parser = argparse.ArgumentParser(description='nfcpy to wpa_supplicant integration for DPP NFC operations') |
| parser.add_argument('-d', const=logging.DEBUG, default=logging.INFO, |
| action='store_const', dest='loglevel', |
| help='verbose debug output') |
| parser.add_argument('-q', const=logging.WARNING, action='store_const', |
| dest='loglevel', help='be quiet') |
| parser.add_argument('--only-one', '-1', action='store_true', |
| help='run only one operation and exit') |
| parser.add_argument('--init-on-touch', '-I', action='store_true', |
| help='initiate handover on touch') |
| parser.add_argument('--no-wait', action='store_true', |
| help='do not wait for tag to be removed before exiting') |
| parser.add_argument('--ifname', '-i', |
| help='network interface name') |
| parser.add_argument('--no-input', '-a', action='store_true', |
| help='do not use stdout input to initiate handover') |
| parser.add_argument('--tag-read-only', '-t', action='store_true', |
| help='tag read only (do not allow connection handover)') |
| parser.add_argument('--handover-only', action='store_true', |
| help='connection handover only (do not allow tag read)') |
| parser.add_argument('--enrollee', action='store_true', |
| help='run as Enrollee-only') |
| parser.add_argument('--configurator', action='store_true', |
| help='run as Configurator-only') |
| parser.add_argument('--config-params', default='', |
| help='configurator parameters') |
| parser.add_argument('--ctrl', default='/var/run/wpa_supplicant', |
| help='wpa_supplicant/hostapd control interface') |
| parser.add_argument('--summary', |
| help='summary file for writing status updates') |
| parser.add_argument('--success', |
| help='success file for writing success update') |
| parser.add_argument('--device', default='usb', help='NFC device to open') |
| parser.add_argument('--chan', default=None, help='channel list') |
| parser.add_argument('--altchan', default=None, help='alternative channel list') |
| parser.add_argument('--netrole', default=None, help='netrole for Enrollee') |
| parser.add_argument('--test-uri', default=None, |
| help='test mode: initial URI') |
| parser.add_argument('--test-alt-uri', default=None, |
| help='test mode: alternative URI') |
| parser.add_argument('--test-sel-uri', default=None, |
| help='test mode: handover select URI') |
| parser.add_argument('--test-crn', default=None, |
| help='test mode: hardcoded crn') |
| parser.add_argument('command', choices=['write-nfc-uri', |
| 'write-nfc-hs'], |
| nargs='?') |
| args = parser.parse_args() |
| summary(args) |
| |
| global handover |
| handover = ConnectionHandover() |
| |
| global only_one |
| only_one = args.only_one |
| |
| global no_wait |
| no_wait = args.no_wait |
| |
| global chanlist, netrole, test_uri, test_alt_uri, test_sel_uri |
| global test_crn |
| chanlist = args.chan |
| handover.altchanlist = args.altchan |
| netrole = args.netrole |
| test_uri = args.test_uri |
| test_alt_uri = args.test_alt_uri |
| test_sel_uri = args.test_sel_uri |
| if args.test_crn: |
| test_crn = struct.pack('>H', int(args.test_crn)) |
| else: |
| test_crn = None |
| |
| logging.basicConfig(level=args.loglevel) |
| for l in ['nfc.clf.rcs380', |
| 'nfc.clf.transport', |
| 'nfc.clf.device', |
| 'nfc.clf.__init__', |
| 'nfc.llcp', |
| 'nfc.handover']: |
| log = logging.getLogger(l) |
| log.setLevel(args.loglevel) |
| |
| global init_on_touch |
| init_on_touch = args.init_on_touch |
| |
| global enrollee_only |
| enrollee_only = args.enrollee |
| |
| global configurator_only |
| configurator_only = args.configurator |
| |
| global config_params |
| config_params = args.config_params |
| |
| if args.ifname: |
| global ifname |
| ifname = args.ifname |
| summary("Selected ifname " + ifname) |
| |
| if args.ctrl: |
| global wpas_ctrl |
| wpas_ctrl = args.ctrl |
| |
| if args.summary: |
| global summary_file |
| summary_file = args.summary |
| |
| if args.success: |
| global success_file |
| success_file = args.success |
| |
| if args.no_input: |
| global no_input |
| no_input = True |
| |
| clf = nfc.ContactlessFrontend() |
| |
| try: |
| if not clf.open(args.device): |
| summary("Could not open connection with an NFC device", color=C_RED) |
| raise SystemExit(1) |
| |
| if args.command == "write-nfc-uri": |
| write_nfc_uri(clf, wait_remove=not args.no_wait) |
| if not operation_success: |
| raise SystemExit(1) |
| raise SystemExit |
| |
| if args.command == "write-nfc-hs": |
| write_nfc_hs(clf, wait_remove=not args.no_wait) |
| if not operation_success: |
| raise SystemExit(1) |
| raise SystemExit |
| |
| global continue_loop |
| while continue_loop: |
| global in_raw_mode |
| was_in_raw_mode = in_raw_mode |
| clear_raw_mode() |
| if was_in_raw_mode: |
| print("\r") |
| if args.handover_only: |
| summary("Waiting a peer to be touched", color=C_MAGENTA) |
| elif args.tag_read_only: |
| summary("Waiting for a tag to be touched", color=C_BLUE) |
| else: |
| summary("Waiting for a tag or peer to be touched", |
| color=C_GREEN) |
| handover.wait_connection = True |
| try: |
| if args.tag_read_only: |
| if not clf.connect(rdwr={'on-connect': rdwr_connected}): |
| break |
| elif args.handover_only: |
| if not clf.connect(llcp={'on-startup': llcp_startup, |
| 'on-connect': llcp_connected, |
| 'on-release': llcp_release}, |
| terminate=terminate_loop): |
| break |
| else: |
| if not clf.connect(rdwr={'on-connect': rdwr_connected}, |
| llcp={'on-startup': llcp_startup, |
| 'on-connect': llcp_connected, |
| 'on-release': llcp_release}, |
| terminate=terminate_loop): |
| break |
| except Exception as e: |
| summary("clf.connect failed: " + str(e)) |
| break |
| |
| if only_one and handover.connected: |
| role = "selector" if handover.i_m_selector else "requestor" |
| summary("Connection handover result: I'm the %s" % role, |
| color=C_YELLOW) |
| if handover.peer_uri: |
| summary("Peer URI: " + handover.peer_uri, color=C_YELLOW) |
| if handover.my_uri: |
| summary("My URI: " + handover.my_uri, color=C_YELLOW) |
| if not (handover.peer_uri and handover.my_uri): |
| summary("Negotiated connection handover failed", |
| color=C_YELLOW) |
| break |
| |
| except KeyboardInterrupt: |
| raise SystemExit |
| finally: |
| clf.close() |
| |
| raise SystemExit |
| |
| if __name__ == '__main__': |
| main() |