Examples

Low Level API Example

This example shows how to use the low level API.

from time import sleep

from toptica.lasersdk.client import Client, NetworkConnection
from toptica.lasersdk.client import UserLevel, Subscription, Timestamp, SubscriptionValue


# The following string is used to define the connection to the device. It can be either
# an IP address, a serial number or system label (when it is in the same subnet),
# or a DNS entry (e.g. 'dlcpro.example.com').
DLCPRO_CONNECTION = '172.16.109.105'


# This will create a new Client and connect to the device via Ethernet. The with-statement
# will automatically call Client.open() and Client.close() at the appropriate times (without
# it these methods have to be called explicitly).
with Client(NetworkConnection(DLCPRO_CONNECTION)) as client:

    #
    # --- Client.get() ---

    # Client.get() allows to query the current value of a parameter. The type of the parameter
    # value is inferred from the response sent by the device.
    print("=== Connected Device ===")
    print("This is a {} with serial number {}.\n".format(
        client.get('system-type'), client.get('serial-number')))

    # Client.get() has an optional second parameter that defines the expected type of the
    # parameter value.
    #
    # The following line would raise a DecopValueError exception because the 'uptime-txt'
    # parameter is a string (therefore would require 'str' as second parameter).
    #
    # print(client.get('uptime-txt', int))

    #
    # --- Client.set() ---

    # Save the current value of the display brightness.
    display_brightness = client.get('display:brightness')
    print("=== Display Brightness ===")
    print('Display Brightness - before:', display_brightness)

    # Client.set() allows to change the value of every writable parameter.
    client.set('display:brightness', 80)
    print('Display Brightness - after: ', client.get('display:brightness'), '\n')

    # Restore the original display brightness.
    client.set('display:brightness', display_brightness)

    #
    # --- Client.change_ul() ---

    # Client.change_ul() allows to change the userlevel of the connection.
    print("=== Userlevel ===")
    print('Userlevel - before:', UserLevel(client.get('ul')))

    # Change the userlevel to "MAINTENANCE".
    client.change_ul(UserLevel.MAINTENANCE, 'CAUTION')
    print('Userlevel - after: ', UserLevel(client.get('ul')), '\n')

    # Change the userlevel back to "NORMAL" (this usually doesn't require a password).
    client.change_ul(UserLevel.NORMAL, '')

    #
    # --- Client.exec() ---

    # Client.exec() allows the execution of commands. The optional 'output_type=str'
    # specifies that the command has additional output that should be returned as a
    # string (the system messages in this case).
    system_messages = client.exec('system-messages:show-all', output_type=str)
    print("=== System Messages ===")
    print(system_messages)

    #
    # --- Client.subscribe() ---

    # This callback is used to print value changes of the parameter 'uptime-txt'.
    def uptime_txt_changed(subscription: Subscription, timestamp: Timestamp, value: SubscriptionValue):
        print("{}: '{}' = '{}'".format(timestamp.time(), subscription.name, value.get()))

    # Client.subscribe() allows adding callbacks for value changes of parameters.
    # Subscribing to value changes requires to either regularly call Client.poll()
    # (which will process all currently queued up callbacks) or Client.run() (which
    # will continuously process callbacks and block until Client.stop() is called).
    print("=== Uptime Subscription ===")
    client.subscribe('uptime-txt', uptime_txt_changed)

    for _ in range(3):
        sleep(1)  # Sleep for one second to simulate some work being done
        client.poll()

Synchronous API Example

This example shows the usage of the synchronous high level API.

import base64
import sys
import time

from toptica.lasersdk.dlcpro.v2_0_3 import DLCpro, NetworkConnection, DeviceNotFoundError, DecopError, UserLevel

script = 'OyBjaGVja3N1bSBmYzc4NDcwOWE5MTQwY2M1Mjg3ZWNiN2M1ZTAxYTQ3YQo7CihwYXJhbS1zZXQh' \
         'ICd1cHRpbWUgMTIzNCkKKGRpc3BsYXkgIkhhbGxpaGFsbG9cbiIp'


def my_callback(subscription, timestamp, value):
    print('--------------------------------------')
    print('Timestamp: ', timestamp)
    print('     Name: ', subscription.name)
    print('    Value: ', value.get())
    print('--------------------------------------')


def main():
    try:
        with DLCpro(NetworkConnection(sys.argv[1])) as dlc:
            try:
                print("\n\n=== Sync Read ===\n")
                print('     System Time :', dlc.time.get())
                print('          Uptime :', dlc.uptime.get())
                print('Firmware Version :', dlc.fw_ver.get())
                print('   System Health :', dlc.system_health_txt.get())
                print('        Emission :', dlc.emission.get())
                print('  Scan Frequency :', dlc.laser1.scan.frequency.get())

                print("\n\n=== Sync Write ===\n")
                old_label = dlc.system_label.get()
                print('     System Label:', old_label)
                dlc.system_label.set('::: THE LABEL :::')
                print('     System Label:', dlc.system_label.get())
                dlc.system_label.set(old_label)
                print('     System Label:', dlc.system_label.get())

                print("\n\n=== Sync Command Output ===\n")
                print(dlc.system_summary())

                print("\n\n=== Sync Command Output+Return ===\n")
                dlc.change_ul(UserLevel.MAINTENANCE, 'CAUTION')
                print('System Connections:', dlc.system_connections())
                dlc.ul.set(3)

                print("\n\n=== Sync Command Input ===\n")
                print('           Uptime:', dlc.uptime.get())
                dlc.service_script(base64.b64decode(script))
                print('           Uptime:', dlc.uptime.get())

                print("\n\n=== Sync Command Normal ===\n")
                print(' User Level:', dlc.ul.get())
                dlc.change_ul(UserLevel.MAINTENANCE, 'CAUTION')
                print(' User Level:', dlc.ul.get())
                dlc.ul.set(3)
                print(' User Level:', dlc.ul.get())

                print("\n\n=== Monitoring ===\n")
                with dlc.uptime.subscribe(my_callback):
                    for _ in range(5):
                        dlc.poll()
                        time.sleep(1)

            except DecopError as error:
                print(error)
    except DeviceNotFoundError:
        print('Device not found')


if __name__ == "__main__":
    main()

Asynchronous API Example

This example shows the usage of the asynchronous high level API.

import asyncio
import base64
import sys

from toptica.lasersdk.asyncio.dlcpro.v2_0_3 import DLCpro, NetworkConnection, DeviceNotFoundError, DecopError, UserLevel

script = 'OyBjaGVja3N1bSBmYzc4NDcwOWE5MTQwY2M1Mjg3ZWNiN2M1ZTAxYTQ3YQo7CihwYXJhbS1zZXQh' \
         'ICd1cHRpbWUgMTIzNCkKKGRpc3BsYXkgIkhhbGxpaGFsbG9cbiIp'

asyncio.get_event_loop().set_debug(True)


async def main():
    try:
        async with DLCpro(NetworkConnection(sys.argv[1])) as dlc:
            try:
                print("\n\n=== Async Read ===\n")
                print('     System Time :', await dlc.time.get())
                print('          Uptime :', await dlc.uptime.get())
                print('Firmware Version :', await dlc.fw_ver.get())
                print('   System Health :', await dlc.system_health_txt.get())
                print('        Emission :', await dlc.emission.get())
                print('  Scan Frequency :', await dlc.laser1.scan.frequency.get())

                print("\n\n=== Async Write ===\n")
                old_label = await dlc.system_label.get()
                print('     System Label:', old_label)
                await dlc.system_label.set('::: THE LABEL :::')
                print('     System Label:', await dlc.system_label.get())
                await dlc.system_label.set(old_label)
                print('     System Label:', await dlc.system_label.get())

                print("\n\n=== Async Command Output ===\n")
                print(await dlc.system_summary())

                print("\n\n=== Async Command Output+Return ===\n")
                await dlc.change_ul(UserLevel.MAINTENANCE, 'CAUTION')
                print('System Connections:', await dlc.system_connections())
                await dlc.ul.set(3)

                print("\n\n=== Async Command Input ===\n")
                print('           Uptime:', await dlc.uptime.get())
                await dlc.service_script(base64.b64decode(script))
                print('           Uptime:', await dlc.uptime.get())

                print("\n\n=== Async Command Normal ===\n")
                print(' User Level:', await dlc.ul.get())
                await dlc.change_ul(UserLevel.MAINTENANCE, 'CAUTION')
                print(' User Level:', await dlc.ul.get())
                await dlc.ul.set(3)
                print(' User Level:', await dlc.ul.get())

                print("\n\n=== Async Monitoring ===\n")
                subscription = await dlc.uptime.subscribe()

                for _ in range(5):
                    timestamp, value = await subscription.next()
                    print(f' {timestamp}: Uptime: {value}')

            except DecopError as error:
                print(error)

    except DeviceNotFoundError:
        sys.stderr.write('Device not found')


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()

    import time
    time.sleep(5)

DLC pro Scope and Lock Example

This example shows how to read and display scope and lock data from a DLC pro scientific laser using Matplotlib.

import sys

import matplotlib.pyplot as pyplot

from toptica.lasersdk.dlcpro.v2_0_3 import DLCpro, NetworkConnection, DeviceNotFoundError
from toptica.lasersdk.utils.dlcpro import *


def main():
    try:
        with DLCpro(NetworkConnection(sys.argv[1])) as dlcpro:
            # Retrieve scan, lock raw data from device
            scope_data = extract_float_arrays('xyY', dlcpro.laser1.scope.data.get())
            raw_lock_candidates = dlcpro.laser1.dl.lock.candidates.get()
            lock_candidates = extract_lock_points('clt', raw_lock_candidates)
            lock_state = extract_lock_state(raw_lock_candidates)

            # Create double y axis plot
            fig, laxis = pyplot.subplots()
            fig.suptitle('DLC pro Scope Output')

            ch1_available = dlcpro.laser1.scope.channel1.signal.get() != -3  # Signal is 'none'
            ch2_available = dlcpro.laser1.scope.channel2.signal.get() != -3

            # Set label and unit of X axis
            laxis.set_xlabel("{} [{}]".format(
                dlcpro.laser1.scope.channelx.name.get(),
                dlcpro.laser1.scope.channelx.unit.get()))

            if ch1_available:
                red = laxis

                # Set label and unit of left Y axis
                red.set_ylabel("{} [{}]".format(
                    dlcpro.laser1.scope.channel1.name.get(),
                    dlcpro.laser1.scope.channel1.unit.get()),
                    color='red')

                # Plot first scope channel data
                red.plot(
                    scope_data['x'],
                    scope_data['y'],
                    linestyle='solid',
                    color='red',
                    zorder=1)

                # Plot lock candidate points if available
                if 'c' in lock_candidates.keys():
                    red.plot(
                        lock_candidates['c']['x'],
                        lock_candidates['c']['y'],
                        linestyle='None',
                        marker='o',
                        markersize=12.0,
                        color='grey',
                        zorder=2)

                # Plot selected lock candidate point if available
                if 'l' in lock_candidates.keys() and lock_state == 3:  # State is 'Selected'
                    red.plot(
                        lock_candidates['l']['x'],
                        lock_candidates['l']['y'],
                        linestyle='None',
                        marker='o',
                        markersize=16.0,
                        color='red',
                        markerfacecolor='none',
                        zorder=3)

                # Retrieve and plot background trace data if lock is closed
                if lock_state == 5:  # State is 'Locked'
                    background_trace = extract_float_arrays('xy', dlcpro.laser1.dl.lock.background_trace.get())

                    red.plot(
                        background_trace['x'],
                        background_trace['y'],
                        linestyle='solid',
                        color='lightgrey',
                        zorder=1)

                # Plot lock tracking position if available
                if 't' in lock_candidates.keys():
                    red.plot(
                        lock_candidates['t']['x'],
                        lock_candidates['t']['y'],
                        linestyle='None',
                        marker='o',
                        markersize=20.0,
                        color='red',
                        markerfacecolor='none',
                        zorder=3)

            # Plot second scope channel data if available
            if ch2_available:
                if ch1_available:
                    blue = laxis.twinx()
                else:
                    blue = laxis

                blue.set_ylabel("{} [{}]".format(
                    dlcpro.laser1.scope.channel2.name.get(),
                    dlcpro.laser1.scope.channel2.unit.get()),
                    color='blue')

                blue.plot(
                    scope_data['x'],
                    scope_data['Y'],
                    linestyle='solid',
                    color='blue',
                    zorder=0)

                laxis.set_zorder(blue.get_zorder() + 1)
                laxis.patch.set_visible(False)

            pyplot.margins(x=0.0)
            pyplot.show()

    except DeviceNotFoundError:
        sys.stderr.write('Device not found')


if __name__ == "__main__":
    main()

DLC pro Recorder Example

This example demonstrates how to download and save data, recorded by a DLC pro recorder.

First use the WideScan of laser 1 to record data and then the following example can be used to download the raw data to a CSV file named “test.csv”.

# Example showing how to read the results from a WideScan

import sys

from toptica.lasersdk.dlcpro.v2_0_3 import DLCpro, NetworkConnection, DeviceNotFoundError
from toptica.lasersdk.utils.dlcpro import * # for extract_float_arrays(...)


def get_recorder_data(laser):
    """Read the data, recorded by the laser's recorder unit, into a dictionary
    with the following content:
    x-axis data:       'x' : {'title': _label_and_physical_unit_, 
                              'data': _array_with_x_values_}
    channel1 data:     'y' : {'title': _label_and_physical_unit_, 
                              'data': _array_with_y_values_}
    channel2 data:     'Y' : {'title': _label_and_physical_unit_, 
                              'data': _array_with_y_values_}
    sampling interval: 'dt': _sampling_interval_in_milliseconds_
    
    channel1 and channel2 data will only be available if they have been recorded,
    i.e. the respective selection was not None.
    """
    x = []
    y = []
    Y = []
    
    # read info about recorded signals and sample count
    x_title           = laser.recorder.data.channelx.name.get() \
                      + " (" + laser.recorder.data.channelx.unit.get() + ")"
    y_title           = laser.recorder.data.channel1.name.get() \
                      + " (" + laser.recorder.data.channel1.unit.get() + ")"
    Y_title           = laser.recorder.data.channel2.name.get() \
                      + " (" + laser.recorder.data.channel2.unit.get() + ")"
    sampling_interval = laser.recorder.data.recorded_sampling_interval.get()
    
    # read recorded data in chunks of up to 1024 samples per channel
    sample_count = laser.recorder.data.recorded_sample_count.get()
    index        = 0
    while index < sample_count:
        # read a chunk of binary data and convert it into xy arrays
        new_data = extract_float_arrays('xyY', laser.recorder.data.get_data(index, 1024))
        x       += new_data['x']
        if 'y' in new_data.keys():
            y += new_data['y']
        if 'Y' in new_data.keys():
            Y += new_data['Y']
        index += len(new_data['x'])
        
    # assemble result dictionary
    result       = {'x': {'title': x_title, 'data' : x}}
    result['dt'] = sampling_interval
    if len(y) > 0:
        result['y'] = {'title': y_title, 'data': y}
    if len(Y) > 0:
        result['Y'] = {'title': Y_title, 'data': Y}
        
    return result
    
def save_recorder_data(filename, data):
    """Write a dictionary, returned by the get_recorder_data(..) function into
    a comma separated file.
    First row will contain headers.
    First column will contain x-axis values.
    Second column will contain y-values of channel1, if available.
    Next column will contain y-values of channel2, if available.
    Last column will contain sampling time in milliseconds.
    """
    with open(filename, "w") as f:
        headers = []
        headers.append(data['x']['title'])
        if 'y' in data.keys():
            headers.append(data['y']['title'])
        if 'Y' in data.keys():
            headers.append(data['Y']['title'])
        headers.append('time (ms)')
        f.write(",".join(headers) + "\n")

        size = len(data['x']['data'])
        dt   = data['dt']
        for i in range(size):
            values = []
            values.append(str(data['x']['data'][i]))
            if 'y' in data.keys():
                values.append(str(data['y']['data'][i]))
            if 'Y' in data.keys():
                values.append(str(data['Y']['data'][i]))
            values.append(str(dt * i))
            f.write(",".join(values) + "\n")
    
def main():
    try:
        with DLCpro(NetworkConnection(sys.argv[1])) as dlcpro:
            result = get_recorder_data(dlcpro.laser1)
            save_recorder_data("test.csv", result)
            

    except DeviceNotFoundError:
        sys.stderr.write('Device not found')

if __name__ == "__main__":
    main()

Simple Data Logger Example

This example demonstrates how to use the low-level API and the monitoring line to realize a simple data logger, periodically saving parameter values into a text file.

from toptica.lasersdk.client import Client, NetworkConnection, DeviceNotFoundError, Subscription, Timestamp, SubscriptionValue
import time
import sys


params    = [                               # list of parameters to be logged
            "time", 
            "laser1:dl:cc:current-act", 
            "laser1:dl:tc:temp-set", 
            "laser1:dl:tc:temp-act"
            ]
                
interval  = 0.2                             # sampling interval in seconds
separator = "\t"                            # separator string for result table
snapshots = {}                              # global dictionary for parameter values


def update_snaptshots(subscription: Subscription, timestamp: Timestamp, value: SubscriptionValue):
    """Callback to replace new parameter value in global dictionary"""
    snapshots[subscription.name] = value.get()

def print_header(output):
    """Print header row of result table. First column is elapsed time since start of logging.
    Argument gives file object for writing to."""
    line = "elapsed time" + separator + separator.join(params) 
    output.write(line + "\n")

def print_snapshot(output, elapsed_time):
    """Print row with current parameters values from global dictionary.
    Argument one gives file object for writing to.
    Argument two gives elapsed time since start of looging in seconds for first column."""
    line = "{:.3f}{}".format(elapsed_time, separator) + separator.join([str(snapshots[p]) for p in params])
    output.write(line + "\n")

def main():
    
    if len(sys.argv) < 3:
        print("usage: python paramlog.py ip-address/serial-number  filename")
        sys.exit()

    dlcpro_network_address   = sys.argv[1]
    log_filename             = sys.argv[2]
    
    try:
        with Client(NetworkConnection(dlcpro_network_address)) as dlc:
            
            # Store current value of all parameters in global dictionary
            # and subscribe for value changes on monitoring line
            for p in params:
                snapshots[p] = dlc.get(p)
                dlc.subscribe(p, update_snaptshots)

            # Initialize timekeeping
            start_time = time.time()
            next_sampling_due  = start_time
            
            with open(log_filename, "w") as f:
                # Initialize result table
                print_header(f)
                print_snapshot(f, 0)

                # Loop until stopped by CTRL-C
                while True:
                    next_sampling_due = next_sampling_due + interval
                    
                    # First wait until 50ms before next sampling is due
                    time.sleep(interval - 0.05)
                    
                    # Then process all parameter updates received from the monitoring line so far
                    # and continue processing until next sampling is due
                    while time.time() < next_sampling_due:
                        dlc.poll()
                        
                    # Finally print snapshot
                    print_snapshot(f, time.time() - start_time)
                
    except DeviceNotFoundError:
        print('Device not found')
        
    except KeyboardInterrupt:
        # use CTRL+C to stop logging
        print("\nFinished.")

if __name__ == "__main__":
    main()