Part 8. Setting up the data pipeline

61.

We are now ready to set up the data pipeline. We will want to create the objects in this order:

  1. RPLParallel (for both session01 and sessioneye)
  2. RPLSplit to create a RPLRaw object for each of the 110 channels (for both session01 and sessioneye)
  3. RPLLFP (which needs the RPLRaw object) for each of the 110 channels (for both session01 and sessioneye)
  4. RPLHighPass (which needs the RPLRaw object) for each of the 110 channels (for both session01 and sessioneye)
  5. Spike sorting (which needs the RPLHighPass objects for both session01 and sessioneye) for each of the 110 channels
  6. Unity (needs RPLParallel object)
  7. EDFSplit to create Eyelink objects (needs RPLParallel, and Unity if available) (for both session01 and sessioneye)
  8. Aligning_objects (needs RPLParallel, Unity, and Eyelink objects)
  9. Raycasting (needs Unity and Eyelink objects)

62.

We will first create a script for submitting jobs to a queue by creating a copy of the script you used in Lab 2, and editing it:

(env1) [ec2-user@ip-10-0-5-43 data] $ cd /data/src/PyHipp

(env1) [ec2-user@ip-10-0-5-43 PyHipp] $ cp slurm.sh pipeline-slurm.sh

(env1) [ec2-user@ip-10-0-5-43 PyHipp] $ nano pipeline-slurm.sh

63.

We will now want to enter what we did above in ipython into the script, but we will just process 8 channels instead of the full 110 channels. However, what we did above involved changing directories numerous times, and that involved processing only one out of the 110 neural channels recorded. We will need to change directories quite a few times if we wanted to process 8 or 110 channels. So instead, we will make use of a command called processDirs in the DataProcessingTools that will automatically change directory to the appropriate directory in which to create the specified objects. In addition, we will want to take note of the time taken to process the data. You can copy and paste the following lines to the end of the file

python -u -c "import PyHipp as pyh; \
import DataProcessingTools as DPT; \
import os; \
import time; \
t0 = time.time(); \
print(time.localtime()); \
DPT.objects.processDirs(dirs=None, objtype=pyh.RPLParallel, saveLevel=1); \
DPT.objects.processDirs(dirs=None, objtype=pyh.RPLSplit, channel=[9, 31, 34, 56, 72, 93, 119, 120]); \
DPT.objects.processDirs(dirs=None, objtype=pyh.RPLLFP, saveLevel=1); \
DPT.objects.processDirs(dirs=None, objtype=pyh.RPLHighPass, saveLevel=1); \
DPT.objects.processDirs(dirs=None, objtype=pyh.Unity, saveLevel=1); \
pyh.EDFSplit(); \
os.chdir('session01'); \
pyh.aligning_objects(); \
pyh.raycast(1); \
DPT.objects.processDirs(level='channel', cmd='import PyHipp as pyh; from PyHipp import mountain_batch; mountain_batch.mountain_batch(); from PyHipp import export_mountain_cells; export_mountain_cells.export_mountain_cells();'); \
print(time.localtime()); \
print(time.time()-t0);"

aws sns publish --topic-arn arn:aws:sns:ap-southeast-1:xxxxxx:awsnotify --message "JobDone"

When the processDirs function is called with level and cmd arguments, it will find all the subdirectories that are at the appropriate level in the data hierarchy (in this case channel directories), and run the specified command in those directories. This will create a job that will perform the spike sorting and save the appropriate spiketrain files into cell directories as discussed in the lecture.

The last Python command will take the difference in time between the start and the end of the job, and print out the difference in the form of number of seconds.

We will also edit the following line in the file to give the job more time (24 hours) to run:

#SBATCH --time=24:00:00   # walltime

as well as to make the job name and the slurm output files more distinct:

#SBATCH -J "pipe"   # job name

#SBATCH -o pipe-slurm.%N.%j.out # STDOUT
#SBATCH -e pipe-slurm.%N.%j.err # STDERR

64.

Save the file, and change directory to the 20181105 data directory:

(env1) [ec2-user@ip-10-0-5-43 PyHipp]$ cd /data/picasso/20181105