Azure BatchでDNAシークエンスデータをアライメントしてみる

目的

Azure BatchでDNAシークエンスデータをアライメントしてみる。勉強目的。

 

この記事でやること

Azure Batchでbwaを実行してみる。以下の記事に沿って勉強する。

チュートリアル - Python 向け Azure Batch SDK を使用する | Microsoft Docs

インストール方法やAzure Batchを実行するためのscriptは↑このページを参照した。今回はAzure BatchでBWAを実行することに主眼を置く。

 

実行環境

Azure 仮想マシン Standard DS1 v2 (1 vcpu、3.5 GB メモリ)  Ubuntu

 

アカウントが必要

・Azure アカウント (メインのアカウント)
・Azure Batchアカウント( Azure Portal で Batch アカウントを作成する | Microsoft Docs 
・ストレージアカウント( Azure Portal でストレージ アカウントを作成… | Microsoft Docs 

 

Pythonのモジュールのインストール

# azure-batchとazure-strageにアクセスするためのpythonパッケージ
pip install azure-batch
pip install azure-storage

 

Azure Batchってなに?

・並列処理が簡単にできる
・コンピューティングリソースを自動的に拡大/縮小できる
解析したい検体数の仮想マシンが自動的に立ち上がることを期待したい!

 

ゲノム解析でとても効果的な機能

・低優先度VMが使用できる。VMの割り込みが起こったタスクは再度キューに登録される。
・コンピューティングノードの最大数が指定できる。

 

プールとは

VMをデプロイする領域
VMのスケール(最低→最大VM数)の指定をする
・AZ_BATCH_NODE_SHARED_DIRとあるが、物理的にshareのなディスクにおいているわけではない

 

ジョブとは

・タスクの置き場、タスクをキューで登録できる
・タスクとは引数ありの実行コマンドとほぼおなじ

 

処理概要

f:id:ken0-1n:20180105112841p:plain

-事前準備

・リファレンスゲノム(bwa index付き)をストレージにアップロードしておく
・アライメントするFASTQファイルをストレージにアップロードしておく
・実行Scriptをgithubにアップロードしておく

-RUNすると

1. python moduleのインポート
2. Container outputを作成する
3. poolを作成する
4. Jobを作成する
5. taskを登録する→poolに空きがあればTASKが実行される
6. taskの完了を待つ

-taskに登録されるscript

・自作したpython_bwa_task.pyについて
 

1. python moduleのインポート

# azure-batchとazure-strageにアクセスするためのpythonパッケージ
import azure.storage.blob as azureblob
import azure.batch.batch_service_client as batch
import azure.batch.batch_auth as batchauth
import azure.batch.models as batchmodels

2. container output を作成する

# まずはblob clientを生成する。Containerの作成や削除、Blobの操作など、
# Strage Accountでの操作を行うためのインスタンス。
blob_client = azureblob.BlockBlobService(
account_name=_YOUR_STORAGE_ACCOUNT_NAME,
account_key=_YOUR_STORAGE_ACCOUNT_KEY)
# container outputの作成
output_container_name = 'output'
blob_client.create_container(output_container_name, fail_on_exist=False)

3. poolを作成する

# まずはbatch clientを生成する。poolの作成や削除、jobの作成や削除など、
# Batch Accountでの操作を行うためのインスタンス。
credentials = batchauth.SharedKeyCredentials(
_YOUR_BATCH_ACCOUNT_NAME, _YOUR_BATCH_ACCOUNT_KEY)
batch_client = batch.BatchServiceClient(
credentials, base_url=_BATCH_ACCOUNT_URL)

VMインスタンスが立ち上がった時に実行されるコマンドの作成
task_commands = [
  'curl -fSsL https://bootstrap.pypa.io/get-pip.py | python',
  'pip install azure-storage==0.32.0',
  'apt-get update && apt-get install -y wget bzip2 make gcc zlib1g-dev',
  'wget http://sourceforge.net/projects/bio-bwa/files/bwa-0.7.15.tar.bz2',
  'tar xjvf bwa-0.7.15.tar.bz2',
  'cd bwa-0.7.15',
  'make',
  'cd ..',
  'cp -rp bwa-0.7.15 $AZ_BATCH_NODE_SHARED_DIR',
  'wget https://github.com/ken0-1n/AzureBatchTutorialBwa/archive/v0.1.0.tar.gz',
  'tar xzvf v0.1.0.tar.gz',
  'cp -p AzureBatchTutorialBwa-0.1.0/{} $AZ_BATCH_NODE_SHARED_DIR'.format(_TUTORIAL_TASK_FILE)
  ]

# VMに指定するイメージの取得
sku_to_use, image_ref_to_use = \
  common.helpers.select_latest_verified_vm_image_with_node_agent_sku(
  batch_service_client, 'Canonical', 'UbuntuServer', '16')
 
# ユーザ権限の設定
user = batchmodels.AutoUserSpecification(
  scope=batchmodels.AutoUserScope.pool,
  elevation_level=batchmodels.ElevationLevel.admin)

# poolの定義, pool IDにはアカウント内でユニークな名前をつけること
new_pool = batch.models.PoolAddParameter(
  id=pool_id,
  virtual_machine_configuration=batchmodels.VirtualMachineConfiguration(
  image_reference=image_ref_to_use,
  node_agent_sku_id=sku_to_use),
  vm_size=_POOL_VM_SIZE, # 'BASIC_A2'を指定
  target_dedicated_nodes=_POOL_NODE_COUNT, # 2を指定。pool内に作られるVMの最大数
  start_task=batch.models.StartTask(
  command_line=common.helpers.wrap_commands_in_shell('linux',
  task_commands),
  user_identity=batchmodels.UserIdentity(auto_user=user),
  wait_for_success=True))

batch_service_client.pool.add(new_pool)

4. jobを作成する

# poolは先ほど作成したpoolを指定する。
job = batch.models.JobAddParameter(_JOB_ID,
   batch.models.PoolInformation(pool_id=POOL_ID))
batch_service_client.job.add(job)

5. taskを登録する

# taskを登録するときにStrageにあるBlobの情報を指定し、
ResourceFileインスタンスを生成しなければならない。そのためのメソッド。
def get_resource(block_blob_client, container_name, blob_name):
   # containerにアクセスするためのtokenの生成
   sas_token = block_blob_client.generate_blob_shared_access_signature(
   container_name,
   blob_name,
   permission=azureblob.BlobPermissions.READ,
   expiry=datetime.datetime.utcnow() + datetime.timedelta(hours=2))
   # blobのurlの生成
   sas_url = block_blob_client.make_blob_url(container_name,
   blob_name,
   sas_token=sas_token)    # blobの情報を設定したインスタンスの生成    return batchmodels.ResourceFile(file_path=blob_name,  blob_source=sas_url)
# StrageにおいてあるFASTQファイルの情報が設定されたインスタンスを生成する
input_container_name = 'fastq'
input_files1 = [get_resourcefile(blob_client, ref_container_name, ref_file_path)
for ref_file_path in ['5929T/sequence1.fastq', '5929N/sequence1.fastq']]
input_files2 = [get_resourcefile(blob_client, ref_container_name, ref_file_path)
for ref_file_path in ['5929T/sequence2.fastq', '5929N/sequence2.fastq']]

# Strageにおいてあるリファレンスゲノムの情報が設定されたインスタンスを生成する
ref_container_name = 'ref'
ref_files = [get_resourcefile(blob_client, ref_container_name, ref_file_path)
for ref_file_path in ['hg19/chr22.fa', 'hg19/chr22.fa.amb',
'hg19/chr22.fa.ann','hg19/chr22.fa.bwt','hg19/chr22.fa.pac','hg19/chr22.fa.sa']]
 # container outputにアクセスするためのtokenの生成
output_container_sas_token = get_container_sas_token(
blob_client,
output_container_name,
azureblob.BlobPermissions.WRITE)

# 実行コマンドをTaskに登録する(5929T)
command = ['python $AZ_BATCH_NODE_SHARED_DIR/python_bwa_task.py --storageaccount chiba1sa --storagecontainer output --sastoken "xxxxxxxxxx" --bwapath $AZ_BATCH_NODE_SHARED_DIR/bwa-0.7.15/bwa --refgenome hg19/chr22.fa --samplename 5929T --fastq1 5929T/sequence1.fastq --fastq2 5929T/sequence2.fastq]
tasks.append(batch.models.TaskAddParameter(
'topNtask1',
common.helpers.wrap_commands_in_shell('linux', command),
resource_files=[5929T/sequence1.fastq, 5929T/sequence2.fastq])

# 実行コマンドをTaskに登録する(5929N)
command = ['python $AZ_BATCH_NODE_SHARED_DIR/python_bwa_task.py --storageaccount chiba1sa --storagecontainer output --sastoken "xxxxxxxxxx" --bwapath $AZ_BATCH_NODE_SHARED_DIR/bwa-0.7.15/bwa --refgenome hg19/chr22.fa --samplename 5929N --fastq1 5929N/sequence1.fastq --fastq2 5929N/sequence2.fastq]
tasks.append(batch.models.TaskAddParameter(
'topNtask2',
common.helpers.wrap_commands_in_shell('linux', command),
resource_files=[5929N/sequence1.fastq, 5929N/sequence2.fastq])

batch_service_client.task.add_collection(job_id, tasks)

6. taskの完了を待つ

timeout_expiration = datetime.datetime.now() + timeout
while datetime.datetime.now() < timeout_expiration:
  tasks = batch_service_client.task.list(job_id)
  incomplete_tasks = [task for task in tasks if task.state != batchmodels.TaskState.completed]
  if not incomplete_tasks:
    return True
  else:
    time.sleep(1)

自作したpython_bwa_task.pyの中身

# モジュールのimport
from __future__ import print_function
import argparse
import collections
import os
import string
import subprocess
import sys

import azure.storage.blob as azureblob

if __name__ == '__main__':

    # 引数の処理
    parser = argparse.ArgumentParser()
    parser.add_argument('--storageaccount')
    parser.add_argument('--storagecontainer')
    parser.add_argument('--sastoken')
    parser.add_argument('--bwapath', required=True)
    parser.add_argument('--fastq1', required=True)
    parser.add_argument('--fastq2', required=True)
    parser.add_argument('--refgenome', required=True)
    parser.add_argument('--samplename', required=True)
    args = parser.parse_args()

    bwa = os.path.realpath(args.bwapath)
    fastq1 = os.path.realpath(args.fastq1)
    fastq2 = os.path.realpath(args.fastq2)
    ref_fa = os.path.realpath(args.refgenome)
    samplename = args.samplename

    output_sam = samplename +'.sam'
    error_log = samplename +'.error.log'

    # bwaを実行する
    sam =  open(output_sam, 'w')
    error =  open(error_log, 'w')
    proc = subprocess.Popen([bwa, 'mem', ref_fa, fastq1, fastq2], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    output = proc.stdout
    for line in output:
        sam.write(line)
    output = proc.stderr
    for line in output:
        error.write(line)
    sam.close()
    error.close()

    # blobにアクセスするためのインスタンスを生成
    blob_client = azureblob.BlockBlobService(account_name=args.storageaccount,
                                             sas_token=args.sastoken)
    # 結果をアップロードする
    output_sam_path = os.path.realpath(output_sam)
  blob_client.create_blob_from_path(args.storagecontainer,
                                      output_sam,
                                      output_sam_path)
    error_log_path = os.path.realpath(error_log)
    blob_client.create_blob_from_path(args.storagecontainer,
                                      error_log,
                                      error_log_path)

 

使ってみて感想

Pythonで記載されたAzure Batchを動かすサンプルがあったので便利。

1検体につき、1VMインスタンスが割り当ててほしいが、Auto Scaleがそのように、割り当ててくれるか検証が必要。

Docker Swarmを使用できるとのことなので、やってみなくてはならない

 

参考

github.com