Examples

Low Level API Example

This example shows how to use the low level API.

from time import sleep

from toptica.lasersdk.client import Client, NetworkConnection
from toptica.lasersdk.client import UserLevel, Subscription, Timestamp, SubscriptionValue


# The following string is used to define the connection to the device. It can be either
# an IP address, a serial number or system label (when it is in the same subnet),
# or a DNS entry (e.g. 'dlcpro.example.com').
DLCPRO_CONNECTION = '172.16.109.105'


# This will create a new Client and connect to the device via Ethernet. The with-statement
# will automatically call Client.open() and Client.close() at the appropriate times (without
# it these methods have to be called explicitly).
with Client(NetworkConnection(DLCPRO_CONNECTION)) as client:

    #
    # --- Client.get() ---

    # Client.get() allows to query the current value of a parameter. The type of the parameter
    # value is inferred from the response sent by the device.
    print("=== Connected Device ===")
    print("This is a {} with serial number {}.\n".format(
        client.get('system-type'), client.get('serial-number')))

    # Client.get() has an optional second parameter that defines the expected type of the
    # parameter value.
    #
    # The following line would raise a DecopValueError exception because the 'uptime-txt'
    # parameter is a string (therefore would require 'str' as second parameter).
    #
    # print(client.get('uptime-txt', int))

    #
    # --- Client.set() ---

    # Save the current value of the display brightness.
    display_brightness = client.get('display:brightness')
    print("=== Display Brightness ===")
    print('Display Brightness - before:', display_brightness)

    # Client.set() allows to change the value of every writable parameter.
    client.set('display:brightness', 80)
    print('Display Brightness - after: ', client.get('display:brightness'), '\n')

    # Restore the original display brightness.
    client.set('display:brightness', display_brightness)

    #
    # --- Client.change_ul() ---

    # Client.change_ul() allows to change the userlevel of the connection.
    print("=== Userlevel ===")
    print('Userlevel - before:', UserLevel(client.get('ul')))

    # Change the userlevel to "MAINTENANCE".
    client.change_ul(UserLevel.MAINTENANCE, 'CAUTION')
    print('Userlevel - after: ', UserLevel(client.get('ul')), '\n')

    # Change the userlevel back to "NORMAL" (this usually doesn't require a password).
    client.change_ul(UserLevel.NORMAL, '')

    #
    # --- Client.exec() ---

    # Client.exec() allows the execution of commands. The optional 'output_type=str'
    # specifies that the command has additional output that should be returned as a
    # string (the system messages in this case).
    system_messages = client.exec('system-messages:show-all', output_type=str)
    print("=== System Messages ===")
    print(system_messages)

    #
    # --- Client.subscribe() ---

    # This callback is used to print value changes of the parameter 'uptime-txt'.
    def uptime_txt_changed(subscription: Subscription, timestamp: Timestamp, value: SubscriptionValue):
        print("{}: '{}' = '{}'".format(timestamp.time(), subscription.name, value.get()))

    # Client.subscribe() allows adding callbacks for value changes of parameters.
    # Subscribing to value changes requires to either regularly call Client.poll()
    # (which will process all currently queued up callbacks) or Client.run() (which
    # will continuously process callbacks and block until Client.stop() is called).
    print("=== Uptime Subscription ===")
    client.subscribe('uptime-txt', uptime_txt_changed)

    for _ in range(3):
        sleep(1)  # Sleep for one second to simulate some work being done
        client.poll()

Synchronous API Example

This example shows the usage of the synchronous high level API.

import base64
import sys
import time

from toptica.lasersdk.dlcpro.v2_0_3 import DLCpro, NetworkConnection, DeviceNotFoundError, DecopError, UserLevel

script = 'OyBjaGVja3N1bSBmYzc4NDcwOWE5MTQwY2M1Mjg3ZWNiN2M1ZTAxYTQ3YQo7CihwYXJhbS1zZXQh' \
         'ICd1cHRpbWUgMTIzNCkKKGRpc3BsYXkgIkhhbGxpaGFsbG9cbiIp'


def my_callback(subscription, timestamp, value):
    print('--------------------------------------')
    print('Timestamp: ', timestamp)
    print('     Name: ', subscription.name)
    print('    Value: ', value.get())
    print('--------------------------------------')


def main():
    try:
        with DLCpro(NetworkConnection(sys.argv[1])) as dlc:
            try:
                print("\n\n=== Sync Read ===\n")
                print('     System Time :', dlc.time.get())
                print('          Uptime :', dlc.uptime.get())
                print('Firmware Version :', dlc.fw_ver.get())
                print('   System Health :', dlc.system_health_txt.get())
                print('        Emission :', dlc.emission.get())
                print('  Scan Frequency :', dlc.laser1.scan.frequency.get())

                print("\n\n=== Sync Write ===\n")
                old_label = dlc.system_label.get()
                print('     System Label:', old_label)
                dlc.system_label.set('::: THE LABEL :::')
                print('     System Label:', dlc.system_label.get())
                dlc.system_label.set(old_label)
                print('     System Label:', dlc.system_label.get())

                print("\n\n=== Sync Command Output ===\n")
                print(dlc.system_summary())

                print("\n\n=== Sync Command Output+Return ===\n")
                dlc.change_ul(UserLevel.MAINTENANCE, 'CAUTION')
                print('System Connections:', dlc.system_connections())
                dlc.ul.set(3)

                print("\n\n=== Sync Command Input ===\n")
                print('           Uptime:', dlc.uptime.get())
                dlc.service_script(base64.b64decode(script))
                print('           Uptime:', dlc.uptime.get())

                print("\n\n=== Sync Command Normal ===\n")
                print(' User Level:', dlc.ul.get())
                dlc.change_ul(UserLevel.MAINTENANCE, 'CAUTION')
                print(' User Level:', dlc.ul.get())
                dlc.ul.set(3)
                print(' User Level:', dlc.ul.get())

                print("\n\n=== Monitoring ===\n")
                with dlc.uptime.subscribe(my_callback):
                    for _ in range(5):
                        dlc.poll()
                        time.sleep(1)

            except DecopError as error:
                print(error)
    except DeviceNotFoundError:
        print('Device not found')


if __name__ == "__main__":
    main()

Asynchronous API Example

This example shows the usage of the asynchronous high level API.

import asyncio
import base64
import sys

from toptica.lasersdk.asyncio.dlcpro.v2_0_3 import DLCpro, NetworkConnection, DeviceNotFoundError, DecopError, UserLevel

script = 'OyBjaGVja3N1bSBmYzc4NDcwOWE5MTQwY2M1Mjg3ZWNiN2M1ZTAxYTQ3YQo7CihwYXJhbS1zZXQh' \
         'ICd1cHRpbWUgMTIzNCkKKGRpc3BsYXkgIkhhbGxpaGFsbG9cbiIp'

asyncio.get_event_loop().set_debug(True)


async def main():
    try:
        async with DLCpro(NetworkConnection(sys.argv[1])) as dlc:
            try:
                print("\n\n=== Async Read ===\n")
                print('     System Time :', await dlc.time.get())
                print('          Uptime :', await dlc.uptime.get())
                print('Firmware Version :', await dlc.fw_ver.get())
                print('   System Health :', await dlc.system_health_txt.get())
                print('        Emission :', await dlc.emission.get())
                print('  Scan Frequency :', await dlc.laser1.scan.frequency.get())

                print("\n\n=== Async Write ===\n")
                old_label = await dlc.system_label.get()
                print('     System Label:', old_label)
                await dlc.system_label.set('::: THE LABEL :::')
                print('     System Label:', await dlc.system_label.get())
                await dlc.system_label.set(old_label)
                print('     System Label:', await dlc.system_label.get())

                print("\n\n=== Async Command Output ===\n")
                print(await dlc.system_summary())

                print("\n\n=== Async Command Output+Return ===\n")
                await dlc.change_ul(UserLevel.MAINTENANCE, 'CAUTION')
                print('System Connections:', await dlc.system_connections())
                await dlc.ul.set(3)

                print("\n\n=== Async Command Input ===\n")
                print('           Uptime:', await dlc.uptime.get())
                await dlc.service_script(base64.b64decode(script))
                print('           Uptime:', await dlc.uptime.get())

                print("\n\n=== Async Command Normal ===\n")
                print(' User Level:', await dlc.ul.get())
                await dlc.change_ul(UserLevel.MAINTENANCE, 'CAUTION')
                print(' User Level:', await dlc.ul.get())
                await dlc.ul.set(3)
                print(' User Level:', await dlc.ul.get())

                print("\n\n=== Async Monitoring ===\n")
                subscription = await dlc.uptime.subscribe()

                for _ in range(5):
                    timestamp, value = await subscription.next()
                    print(f' {timestamp}: Uptime: {value}')

            except DecopError as error:
                print(error)

    except DeviceNotFoundError:
        sys.stderr.write('Device not found')


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()

    import time
    time.sleep(5)

DLC pro Scope and Lock Example

This example shows how to read and display scope and lock data from a DLC pro scientific laser using Matplotlib.

import sys

import matplotlib.pyplot as pyplot

from toptica.lasersdk.dlcpro.v2_0_3 import DLCpro, NetworkConnection, DeviceNotFoundError
from toptica.lasersdk.utils.dlcpro import *


def main():
    try:
        with DLCpro(NetworkConnection(sys.argv[1])) as dlcpro:
            # Retrieve scan, lock raw data from device
            scope_data = extract_float_arrays('xyY', dlcpro.laser1.scope.data.get())
            raw_lock_candidates = dlcpro.laser1.dl.lock.candidates.get()
            lock_candidates = extract_lock_points('clt', raw_lock_candidates)
            lock_state = extract_lock_state(raw_lock_candidates)

            # Create double y axis plot
            fig, laxis = pyplot.subplots()
            fig.suptitle('DLC pro Scope Output')

            ch1_available = dlcpro.laser1.scope.channel1.signal.get() != -3  # Signal is 'none'
            ch2_available = dlcpro.laser1.scope.channel2.signal.get() != -3

            # Set label and unit of X axis
            laxis.set_xlabel("{} [{}]".format(
                dlcpro.laser1.scope.channelx.name.get(),
                dlcpro.laser1.scope.channelx.unit.get()))

            if ch1_available:
                red = laxis

                # Set label and unit of left Y axis
                red.set_ylabel("{} [{}]".format(
                    dlcpro.laser1.scope.channel1.name.get(),
                    dlcpro.laser1.scope.channel1.unit.get()),
                    color='red')

                # Plot first scope channel data
                red.plot(
                    scope_data['x'],
                    scope_data['y'],
                    linestyle='solid',
                    color='red',
                    zorder=1)

                # Plot lock candidate points if available
                if 'c' in lock_candidates.keys():
                    red.plot(
                        lock_candidates['c']['x'],
                        lock_candidates['c']['y'],
                        linestyle='None',
                        marker='o',
                        markersize=12.0,
                        color='grey',
                        zorder=2)

                # Plot selected lock candidate point if available
                if 'l' in lock_candidates.keys() and lock_state == 3:  # State is 'Selected'
                    red.plot(
                        lock_candidates['l']['x'],
                        lock_candidates['l']['y'],
                        linestyle='None',
                        marker='o',
                        markersize=16.0,
                        color='red',
                        markerfacecolor='none',
                        zorder=3)

                # Retrieve and plot background trace data if lock is closed
                if lock_state == 5:  # State is 'Locked'
                    background_trace = extract_float_arrays('xy', dlcpro.laser1.dl.lock.background_trace.get())

                    red.plot(
                        background_trace['x'],
                        background_trace['y'],
                        linestyle='solid',
                        color='lightgrey',
                        zorder=1)

                # Plot lock tracking position if available
                if 't' in lock_candidates.keys():
                    red.plot(
                        lock_candidates['t']['x'],
                        lock_candidates['t']['y'],
                        linestyle='None',
                        marker='o',
                        markersize=20.0,
                        color='red',
                        markerfacecolor='none',
                        zorder=3)

            # Plot second scope channel data if available
            if ch2_available:
                if ch1_available:
                    blue = laxis.twinx()
                else:
                    blue = laxis

                blue.set_ylabel("{} [{}]".format(
                    dlcpro.laser1.scope.channel2.name.get(),
                    dlcpro.laser1.scope.channel2.unit.get()),
                    color='blue')

                blue.plot(
                    scope_data['x'],
                    scope_data['Y'],
                    linestyle='solid',
                    color='blue',
                    zorder=0)

                laxis.set_zorder(blue.get_zorder() + 1)
                laxis.patch.set_visible(False)

            pyplot.margins(x=0.0)
            pyplot.show()

    except DeviceNotFoundError:
        sys.stderr.write('Device not found')


if __name__ == "__main__":
    main()