# This script is part of pymaid (http://www.github.com/navis-org/pymaid).
# Copyright (C) 2017 Philipp Schlegel
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along
import itertools
import os
import six
import sys
import warnings
import typing as tp
import pandas as pd
import numpy as np
from collections.abc import Iterable
with warnings.catch_warnings():
warnings.simplefilter("ignore")
try:
from vispy.visuals import Visual
except ImportError:
Visual = None
from . import core, fetch, config, client
# Set up logging
logger = config.get_logger(__name__)
__all__ = ['set_loggers', 'set_pbars', 'eval_skids', 'clear_cache', 'shorten_name']
[docs]
def clear_cache():
"""Clear cache of global CatmaidInstance."""
if 'remote_instance' in sys.modules:
rm = sys.modules['remote_instance']
elif 'remote_instance' in globals():
rm = globals()['remote_instance']
else:
raise ValueError('No global CatmaidInstance found.')
rm.clear_cache()
def _type_of_script():
"""Return context in which pymaid is run."""
try:
ipy_str = str(type(get_ipython()))
if 'zmqshell' in ipy_str:
return 'jupyter'
if 'terminal' in ipy_str:
return 'ipython'
except BaseException:
return 'terminal'
def is_jupyterlab():
"""Test if we are inside Jupyter lab."""
import psutil
return any(['jupyter-lab' in x for x in psutil.Process().parent().cmdline()])
def has_plotly_extension():
"""Check if Jupyter lab plotly renderer extension is installed."""
import subprocess
# This is the old plotly renderer
result = subprocess.run(['jupyter',
'labextension',
'check',
'@jupyterlab/plotly-extension'])
if result.returncode == 0:
return True
# This is the new one
result = subprocess.run(['jupyter',
'labextension',
'check',
'jupyterlab-plotly'])
if result.returncode == 0:
return True
return False
def is_headless():
"""Check if Display is available."""
return 'DISPLAY' not in os.environ
def is_jupyter():
"""Test if pymaid is run in a Jupyter notebook."""
return _type_of_script() == 'jupyter'
def ipywidgets_installed():
"""Test if pymaid is run in a Jupyter notebook."""
try:
import ipywidgets
return True
except ImportError:
return False
except BaseException as e:
logger.error('Error importing ipytwidgets: {}'.format(str(e)))
[docs]
def set_loggers(level='INFO'):
"""Helper function to set levels for all associated module loggers."""
config.logger.setLevel(level)
[docs]
def set_pbars(hide=None, leave=None, jupyter=None):
"""Set global progress bar behaviors.
Parameters
----------
hide : bool, optional
Set to True to hide all progress bars.
leave : bool, optional
Set to False to clear progress bars after they have finished.
jupyter : bool, optional
Set to False to force using of classic tqdm even if in
Jupyter environment.
Returns
-------
Nothing
"""
if isinstance(hide, bool):
config.pbar_hide = hide
if isinstance(leave, bool):
config.pbar_leave = leave
if isinstance(jupyter, bool):
if jupyter:
if not is_jupyter():
logger.error('Unable to use fancy Jupyter progress: '
'No Jupyter environment detected.')
elif not ipywidgets_installed():
logger.error('Unable to use fancy Jupyter progress: '
'ipywidgets not installed .')
else:
config.tqdm = config.tqdm_notebook
config.trange = config.tnrange
else:
config.tqdm = config.tqdm_classic
config.trange = config.trange_classic
def _make_iterable(x, force_type=None):
"""Convert input into a np.ndarray, if it isn't already.
For dicts, keys will be turned into array.
"""
if not isinstance(x, Iterable) or isinstance(x, six.string_types):
x = [x]
if isinstance(x, dict) or isinstance(x, set):
x = list(x)
if force_type:
return np.array(x).astype(force_type)
else:
return np.array(x)
def _make_non_iterable(x):
"""Convert input into non-iterable, if it isn't already.
Will raise error if len(x) > 1.
"""
if not _is_iterable(x):
return x
elif len(x) == 1:
return x[0]
else:
raise ValueError('Iterable must not contain more than one entry.')
def _is_iterable(x):
"""Check is input is iterable but not str, dictionary or pandas DataFrame.
"""
if isinstance(x, Iterable) and not isinstance(x, (six.string_types, pd.DataFrame)):
return True
else:
return False
def _eval_conditions(x):
"""Split list of strings into positive (no ~) and negative (~) conditions.
"""
x = _make_iterable(x, force_type=str)
return [i for i in x if not i.startswith('~')], [i[1:] for i in x if i.startswith('~')]
def _eval_remote_instance(remote_instance, raise_error=True) -> client.CatmaidInstance:
"""Evaluates remote instance.
If input is None, checks for globally defined remote instances as fall
back.
Parameters
----------
remote_instance : CatmaidInstance | None
Input to be evaluated.
raise_on_error : bool, optional
If True will raise error if input is ``None`` and
no global CatmaidInstance was found.
Returns
-------
CatmaidInstance
"""
if remote_instance is None:
if 'remote_instance' in sys.modules:
return sys.modules['remote_instance']
elif 'remote_instance' in globals():
return globals()['remote_instance']
else:
if raise_error:
raise Exception('No pymaid.CatmaidInstance found. Please '
'either define globally or pass explicitly '
'as "remote_instance". See '
'`help(pymaid.CatmaidInstance) for details.')
else:
logger.warning('No global remote instance found.')
elif not isinstance(remote_instance, client.CatmaidInstance):
error = 'Expected None or CatmaidInstance, got {}'.format(type(remote_instance))
if raise_error:
raise TypeError(error)
else:
logger.warning(error)
return remote_instance
[docs]
def eval_skids(x, remote_instance=None, warn_duplicates=True):
"""Extract skeleton IDs from input.
Will turn annotations and neuron names into skeleton IDs.
Parameters
----------
x : int | str | CatmaidNeuron | CatmaidNeuronList | DataFrame
Your options are either::
1. int or list of ints:
- will be assumed to be skeleton IDs
2. str or list of str:
- if convertible to int, will be interpreted as x
- if starts with 'annotation:' will be assumed to be
annotations
- else will be assumed to be neuron names
3. For CatmaidNeuron/List or pandas.DataFrames/Series:
- will look for ``skeleton_id`` attribute
remote_instance : CatmaidInstance, optional
If not passed directly, will try using global.
warn_duplicates : bool, optional
If True, will warn if duplicate skeleton IDs are found.
Only applies to CatmaidNeuronLists.
Returns
-------
list
List containing skeleton IDs as strings.
"""
remote_instance = _eval_remote_instance(remote_instance,
raise_error=False)
if isinstance(x, (int, np.int64, np.int32)):
return [str(x)]
elif isinstance(x, str):
try:
int(x)
return [str(x)]
except BaseException:
if x.startswith('annotation:') or x.startswith('annotations:'):
an = x[x.index(':') + 1:]
return fetch.get_skids_by_annotation(an,
remote_instance=remote_instance)
elif x.startswith('name:'):
return fetch.get_skids_by_name(x[5:],
remote_instance=remote_instance,
allow_partial=False
).skeleton_id.tolist()
else:
return fetch.get_skids_by_name(x,
remote_instance=remote_instance,
allow_partial=False
).skeleton_id.tolist()
elif isinstance(x, (list, np.ndarray, set)):
skids = []
for e in x:
temp = eval_skids(e, remote_instance=remote_instance)
if isinstance(temp, (list, np.ndarray)):
skids += temp
else:
skids.append(temp)
return sorted(set(skids), key=skids.index)
elif isinstance(x, core.CatmaidNeuron):
return [x.skeleton_id]
elif isinstance(x, core.CatmaidNeuronList):
if len(x.skeleton_id) != len(set(x.skeleton_id)) and warn_duplicates:
logger.warning('Duplicate skeleton IDs found in neuronlist. '
'The function you are using might not respect '
'fragments of the same neuron. For explanation see '
'http://pymaid.readthedocs.io/en/latest/source/conn'
'ectivity_analysis.html.')
return list(x.skeleton_id)
elif isinstance(x, pd.DataFrame):
if 'skeleton_id' not in x.columns:
raise ValueError('Expect "skeleton_id" column in pandas DataFrames')
return x.skeleton_id.tolist()
elif isinstance(x, pd.Series):
if x.name == 'skeleton_id':
return x.tolist()
elif 'skeleton_id' in x:
return [x.skeleton_id]
else:
raise ValueError('Unable to extract skeleton ID from Pandas '
'series {0}'.format(x))
elif isinstance(x, type(None)):
return None
else:
logger.error(
'Unable to extract x from type %s' % str(type(x)))
raise TypeError('Unable to extract skids from type %s' % str(type(x)))
def eval_user_ids(x, user_list=None, remote_instance=None):
"""Check a list of users and turns them into user IDs.
Always returns a list! Will attempt converting in the following order:
(1) user ID
(2) login name
(3) last name
(4) full name
(5) first name
Important
---------
Last, first and full names are case-sensitive!
Parameters
----------
x : int | str | list of either
Users to check.
user_list : pd.DataFrame, optional
User list from :func:`~pymaid.get_user_list`. If you
already have it, pass it along to save time.
"""
remote_instance = _eval_remote_instance(remote_instance)
if x and not isinstance(x, (list, np.ndarray)):
x = [x]
try:
# Test if we have any non IDs (i.e. logins) in users
user_ids = [int(u) for u in x]
except BaseException:
# Get list of users if we don't already have it
if not user_list:
user_list = fetch.get_user_list(
remote_instance=remote_instance)
# Now convert individual entries to user IDs
user_ids = []
for u in x:
try:
user_ids.append(int(u))
except BaseException:
for col in ['login', 'last_name', 'full_name', 'first_name']:
found = []
if u in user_list[col].values:
found = user_list[user_list[col] == u].id.tolist()
break
if not found:
logger.warning(
'User "{0}" not found. Skipping...'.format(u))
elif len(found) > 1:
logger.warning('Multiple matching entries for '
'"{0}" found. Skipping...'.format(u))
else:
user_ids.append(int(found[0]))
return user_ids
def eval_node_ids(x, connectors=True, nodes=True):
"""Extract node or connector IDs.
Parameters
----------
x : int | str | CatmaidNeuron | CatmaidNeuronList | DataFrame
Your options are either::
1. int or list of ints will be assumed to be node IDs
2. str or list of str will be checked if convertible to int
3. For CatmaidNeuron/List or pandas.DataFrames will try
to extract node IDs
connectors : bool, optional
If True will return connector IDs from neuron objects
nodes : bool, optional
If True will return node IDs from neuron objects
Returns
-------
list
List containing nodes as strings.
"""
if isinstance(x, (int, np.int64, np.int32)):
return [x]
elif isinstance(x, str):
try:
return [int(x)]
except BaseException:
raise TypeError(
'Unable to extract node ID from string <%s>' % str(x))
elif isinstance(x, (set, list, np.ndarray)):
# Check non-integer entries
ids = []
for e in x:
temp = eval_node_ids(e, connectors=connectors,
nodes=nodes)
if isinstance(temp, (list, np.ndarray)):
ids += temp
else:
ids.append(temp)
# Preserving the order after making a set is super costly
# return sorted(set(ids), key=ids.index)
return list(set(ids))
elif isinstance(x, core.CatmaidNeuron):
to_return = []
if nodes:
to_return += x.nodes.treenode_id.tolist()
if connectors:
to_return += x.connectors.connector_id.tolist()
return to_return
elif isinstance(x, core.CatmaidNeuronList):
to_return = []
for n in x:
if nodes:
to_return += n.nodes.treenode_id.tolist()
if connectors:
to_return += n.connectors.connector_id.tolist()
return to_return
elif isinstance(x, (pd.DataFrame, pd.Series)):
to_return = []
if nodes and 'node_id' in x:
to_return += x.node_id.tolist()
if connectors and 'connector_id' in x:
to_return += x.connector_id.tolist()
if 'connector_id' not in x and 'node_id' not in x:
to_return = x.tolist()
return to_return
else:
raise TypeError(f'Unable to extract node IDs from type {type(x)}')
def _unpack_neurons(x, raise_on_error=True):
"""Unpack neurons and returns a list of individual neurons."""
neurons = []
if isinstance(x, (list, np.ndarray, tuple)):
for l in x:
neurons += _unpack_neurons(l)
elif isinstance(x, core.CatmaidNeuron):
neurons.append(x)
elif isinstance(x, core.CatmaidNeuronList):
neurons += x.neurons
elif raise_on_error:
raise TypeError('Unknown neuron format: "{}"'.format(type(x)))
return neurons
def _parse_objects(x, remote_instance=None):
"""Parse objects into different types.
Returns
-------
skids : list
skdata : pymaid.CatmaidNeuronList
dotprops : pd.DataFrame
volumes : list
points : list of arrays
visuals : list of vispy visuals
"""
if not isinstance(x, list):
x = [x]
# If any list in x, flatten first
if any([isinstance(i, list) for i in x]):
# We need to be careful to preserve order because of colors
y = []
for i in x:
y += i if isinstance(i, list) else [i]
x = y
# Check for skeleton IDs
skids = []
for ob in x:
if isinstance(ob, (str, int)):
try:
skids.append(int(ob))
except BaseException:
pass
# Collect neuron objects and collate to single Neuronlist
neuron_obj = [ob for ob in x if isinstance(ob,
(core.CatmaidNeuron,
core.CatmaidNeuronList))]
skdata = core.CatmaidNeuronList(neuron_obj, make_copy=False)
# Collect visuals
if Visual is not None:
visuals = [ob for ob in x if isinstance(ob, Visual)]
else:
# Best guess if vispy not installed
visuals = [ob for ob in x if 'Visual' in str(type(x)) and 'vispy' in str(type(x))]
# Collect dotprops
dotprops = [ob for ob in x if isinstance(ob, core.Dotprops)]
if len(dotprops) == 1:
dotprops = dotprops[0]
elif len(dotprops) == 0:
dotprops = core.Dotprops()
dotprops['gene_name'] = []
elif len(dotprops) > 1:
dotprops = pd.concat(dotprops)
# Collect and parse volumes
volumes = [ob for ob in x if isinstance(ob, (core.Volume, str))]
# Collect dataframes with X/Y/Z coordinates
# Note: dotprops and volumes are instances of pd.DataFrames
dataframes = [ob for ob in x if isinstance(ob, pd.DataFrame) and not isinstance(ob, (core.Dotprops, core.Volume))]
if [d for d in dataframes if False in [c in d.columns for c in ['x', 'y', 'z']]]:
logger.warning('DataFrames must have x, y and z columns.')
# Filter to and extract x/y/z coordinates
dataframes = [d for d in dataframes if False not in [c in d.columns for c in ['x', 'y', 'z']]]
dataframes = [d[['x', 'y', 'z']].values for d in dataframes]
# Collect arrays
arrays = [ob.copy() for ob in x if isinstance(ob, np.ndarray)]
# Remove arrays with wrong dimensions
if [ob for ob in arrays if ob.shape[1] != 3]:
logger.warning('Point objects need to be of shape (n,3).')
arrays = [ob for ob in arrays if ob.shape[1] == 3]
points = dataframes + arrays
return skids, skdata, dotprops, volumes, points, visuals
def __guess_sentiment(x):
"""Classify a list of words.
Tries sorting words into either <type>, <nickname>, <tracer> or <generic>
annotations.
"""
sent = []
for i, w in enumerate(x):
# If word is a number, it's most likely something generic
if w.isdigit():
sent.append('generic')
elif w == 'neuron':
# If there is a lonely "neuron" followed by a number, it's generic
if i != len(x) and x[i + 1].isdigit():
sent.append('generic')
# If not, it's probably type
else:
sent.append('type')
# If there is a short, all upper case word after the generic information
elif w.isupper() and len(w) > 1 and w.isalpha() and 'generic' in sent:
# If there is no number in that word, it's probably tracer initials
sent.append('tracer')
else:
# If the word is AFTER the generic number, it's probably a nickname
if 'generic' in sent:
sent.append('nickname')
# If not, it's likely type information
else:
sent.append('type')
return sent
def parse_neuronname(x):
"""Parse neuron names into type, nickname, tracer and generic information.
This works best if neuron name follows this convention::
{type} {generic} {nickname} {tracer initials}
Parameters
----------
x : str | CatmaidNeuron
Neuron name.
Returns
-------
type : str
nickname : str
tracer : str
generic : str
Examples
--------
>>> pymaid.utils.parse_neuronname('AD1b2#7 3080184 Dust World JJ PS')
('AD1b2#7', 'Dust World', 'JJ PS', '3080184')
"""
if isinstance(x, core.CatmaidNeuron):
x = x.neuron_name
if not isinstance(x, str):
raise TypeError('Unable to parse name: must be str, not {}'.format(type(x)))
# Split name into single words
words = x.split(' ')
sentiments = __guess_sentiment(words)
type_str = [w for w, s in zip(words, sentiments) if s == 'type']
nick_str = [w for w, s in zip(words, sentiments) if s == 'nickname']
tracer_str = [w for w, s in zip(words, sentiments) if s == 'tracer']
gen_str = [w for w, s in zip(words, sentiments) if s == 'generic']
return ' '.join(type_str), ' '.join(nick_str), ' '.join(tracer_str), ' '.join(gen_str)
[docs]
def shorten_name(x, max_len=30):
"""Shorten a neuron name by iteratively removing non-essential bits.
Prioritises generic -> tracer -> nickname -> type information when removing
until target length is reached. This works best if neuron name follows
this convention::
{type} {generic} {nickname} {tracers}
Parameters
----------
x : str | CatmaidNeuron
Neuron name.
max_len : int, optional
Max length of shortened name.
Returns
-------
shortened name : str
Examples
--------
>>> pymaid.shorten_name('AD1b2#7 3080184 Dust World JJ PS', 30)
'AD1b2#7 Dust World [..]'
"""
if isinstance(x, core.CatmaidNeuron):
x = x.neuron_name
# Split into individual words and guess their type
words = x.split(' ')
sentiments = __guess_sentiment(words)
# Make sure we're working on a copy of the original neuron name
short = str(x)
ty = ['generic', 'tracer', 'nickname', 'type']
# Iteratively remove generic -> tracer -> nickname -> type information
for t, (w, sent) in itertools.product(ty, zip(words[::-1], sentiments[::-1])):
# Stop if we are below max length
if len(short) <= max_len:
break
# Stop if there is only a single word left
elif len(short.replace('[..]', '').strip().split(' ')) == 1:
break
# Skip if this word is not of the right sentiment
elif t != sent:
continue
# Remove this word
short = short.replace(w, '[..]').strip()
# Make sure to merge consecutive '[..]'
while '[..] [..]' in short:
short = short.replace('[..] [..]', '[..]')
return short
def to_float(x):
"""Convert input to float."""
try:
return float(x)
except ValueError:
return None
except BaseException:
raise
class DataFrameBuilder:
def __init__(
self, columns: tp.Sequence[tp.Hashable], dtypes: tp.Optional[tp.Sequence] = None
):
self.columns: tp.Dict[tp.Hashable, list] = {c: [] for c in columns}
self.dtypes: tp.Optional[list] = list(dtypes) if dtypes else None
if dtypes is not None and len(dtypes) != len(self.columns):
raise ValueError()
def _check_len(self, row: tp.Collection):
"""Raise an error if row is of incorrect length.
Parameters
----------
row : Collection
Row to be added (as dict or sequence).
Raises
------
ValueError
If row length does not match number of columns.
"""
if len(row) != len(self.columns):
raise ValueError(
f"Row length ({len(row)}) does not match number of columns ({len(self.columns)})"
)
def append_row(self, row: tp.Sequence):
"""Append a sequence to the rows.
Parameters
----------
row : Sequence
Must be same length as number of columns.
Returns
-------
self
"""
self._check_len(row)
for item, col in zip(row, self.columns.values()):
col.append(item)
return self
def append_dict(self, row: tp.Dict[tp.Hashable, tp.Any]):
"""Append a dict to the rows.
Parameters
----------
row : dict[tp.Hashable, tp.Any]
Keys must match columns.
Returns
-------
self
"""
self._check_len(row)
for k, v in row.items():
self.columns[k].append(v)
return self
def build(self, index_col=None) -> pd.DataFrame:
"""Build the dataframe.
Parameters
----------
index_col : Hashable, optional
Which column to use as the index, by default None
(i.e. numeric index in insertion order).
Returns
-------
pandas.DataFrame
"""
cols = dict()
index = None
for idx, (k, v) in enumerate(self.columns.items()):
dtype = self.dtypes[idx] if self.dtypes else None
v2 = pd.Series(v, dtype=dtype, name=k)
if k == index_col:
index = v2
else:
cols[k] = v2
df = pd.DataFrame.from_dict(cols)
if index is not None:
df.index = index
return df
def clean_points(
df: pd.DataFrame, fmt: tp.Union[str, tp.Callable[[str], tp.Hashable]], dims="xyz"
) -> pd.DataFrame:
"""Extract points from a dataframe.
Parameters
----------
df : pd.DataFrame
Dataframe, some of whose columns represent points.
fmt : tp.Union[str, tp.Callable[[str], tp.Hashable]]
Either a format string (e.g. ``"point_{}_1"``),
or a callable which takes a string and returns a column name.
When a dimension name (like ``"x"``) is passed to the format string,
or the callable, the result should be the name of a column in ``df``.
dims : str, optional
Dimension name order, by default "xyz"
Returns
-------
pd.DataFrame
The column index will be the dimensions given in ``dims``.
Call ``.to_numpy()`` to convert into a numpy array.
"""
if isinstance(fmt, str):
fmt_c = lambda s: fmt.format(s)
else:
fmt_c = fmt
cols = {fmt_c(d): d for d in dims}
return df[list(cols)].rename(columns=cols)