
ããã«ã¡ã¯ãLayerX Ai Workforceäºæ¥é¨ã§SREããã¦ãã¾ã @shinyorkeï¼ãããã¼ãï¼ã¨ç³ãã¾ãã
æè¿ã¯Ai Workforceã®ãã¼ã¿å¨ãã®åºç¤ãä½ãä»äºãããªãããå人ã¨ãã¦ã¯éç解説AI Agentã®éçºãé å¼µã£ã¦ãã¾ãã
æ¬ããã°ã§ã¯ãAi Workforceã®ãã¼ã¿å¨ãã®åºç¤ã®ã³ã³ãã¼ãã³ãã®ä¸é¨ã§ããELTã®é¸å®ãã©ããããã«ã¤ãã¦å·çãã¾ãã
ç¹ã«ä»åã¯ã
- ããã¼ã¸ããµã¼ãã¹ï¼Azure Data Factoryãéç§°ADFï¼ã§ã®æ§ç¯ã»å®è£ ãæ¤è¨ãã¦ããã ãªãæå¿µããã®ã
- ADF ã®ä»£æ¿ã¨ã㦠dlt + Container App Job ãé¸ãã çµç·¯ã¨ãå®éã©ãã ã£ãã
- Azure Cosmos DB for PostgreSQL ã® Read Replica ãç¸æã«ããã¨ãã« ããã£ãç¹ã¨å¯¾ç
ãä¸å¿ã«å ±æã§ããã°ã¨æãã¾ãã
ãªãã以ä¸ã«ã¤ãã¦ã¯æ¬ããã°ã®ã¹ã³ã¼ãå¤ã¨ããã¦ããããã°ã¨æãã¾ãã
- ãã¼ã¿åºç¤æ§ç¯ã»éç¨ã®å ·ä½
- ãã¼ã¿ã使ã£ã¦ä½ãããã
- ãã®ä»ãAi Workforceã®ç¹å®ã®æ©è½ãæ¥åã«é¢ããè¨å
TL;DR
- dltï¼data load toolï¼ã¯ãPythonã§æ¸ãã ããã®ã·ã³ãã«ããå¼·ã¿ã SQLAlchemy ã¨çµã¿åãããã ãã§ PostgreSQL â Parquet â Azure Blob ãå®çµãã&Container App Jobã¨ã®ç¸æ§ãè¯ãã
- ADF + Self-hosted IR ã¯éç¨è¤éæ§ãé«ããPrivate æ¥ç¶ç°å¢ã§ã¯å°å ¥ã«è¦æãå¿ è¦ã
- Cosmos DB for PostgreSQLã® Read Replica çµç±ã®é£æºã«è¦æ³¨æã Server-side Cursorã¨ã® WAL ç«¶åãNullPool ã®å¿ è¦æ§ãOOM 対çãªã©ããã£ã¦ããæ°ãã¤ããã¤ã³ããè¤æ°ã
ç®æ¬¡
- TL;DR
- ç®æ¬¡
- ããã¼ã¸ããµã¼ãã¹ï¼ADFï¼ãæ¤è¨ãããè¦éã£ã
- dlt ã¨ã¯ä½ã
- ã¢ã¼ããã¯ãã£æ¦è¦
- å®è£ ã®è©³ç´°ã¨ããããã¤ã³ã
- ãã£ã¦ã¿ã¦ã©ãã ã£ãã
- çµã³
ããã¼ã¸ããµã¼ãã¹ï¼ADFï¼ãæ¤è¨ãããè¦éã£ã
æåã®è¨è¨ã§ã¯ Azure Data Factoryï¼ADFï¼+ Self-hosted Integration Runtimeï¼SHIRï¼ ã®çµã¿åããã§å®è£ ãããã¨ãã¦ãã¾ããã
ADF 㯠Azure ãã¤ãã£ãã®ãã¼ã¿çµ±åãµã¼ãã¹ã§ãGUIã§ãã¤ãã©ã¤ã³ãçµãã¦ã¨ã³ã¿ã¼ãã©ã¤ãºåãã®æ©è½ãå å®ãã¦ãã¾ãã
æåã¯ããã§ã¤ã±ããã...ã¨èãã¾ãããããããã¯ã¼ã¯æ§æä¸ããã©ã¤ãã¼ãã«ããããï¼L4ã¬ãã«ï¼ã¨ããè¦ä»¶ãæºãããã¨ãã¦ç ´ç¶»ãã¾ããã
ãã®è¦ä»¶ï¼Private æ¥ç¶ç°å¢ï¼ã§ä½¿ãå ´åã«ã¯ Self-hosted IRï¼SHIRï¼ ã¨ããã³ã³ãã¼ãã³ããèªåã§ç«ã¦ãå¿ è¦ãããã®ã§ãã以ä¸ã®èª²é¡ãããè¦éããã¨ã«ãã¾ããã
| èª²é¡ | å 容 |
|---|---|
| Managed PE ã®å¶ç´ | ADF ã® Managed PE ã¯ä½¿ãããSHIR çµç±ã¨ãªãããæ¥ç¶è¨è¨ãè¤éå |
| ã³ã³ãã¼ãã³ãæ°ã®å¤ã | ADF / SHIR / Linked Service / Pipeline ã¨ç®¡ç対象ãå¤ããé害æã®åãåããå¤§å¤ |
| SHIR ã®ç¶æ 管ç | VMãããªãã¨SHIRã使ããªããã常æèµ·åãå¿ è¦ãã¤ã¡ã¼ã¸ã¿ã°åºå®ã»ã³ã³ããã¼ã«ãã¬ã¼ã³å°éæ§ãªã©é害é¢ãå¤ã |
| ã©ã³ãã³ã°ã³ã¹ã | SHIR ç¨ VM ã常æèµ·åã®ããã³ã¹ããçºçãç¶ãã |
æç¢ºãªKnockout Factorã¯ãManaged PEï¼Private Endpointï¼ ã®å¶ç´ãã§ãããããã§ããªãã¨ãããè©°ã¿ã¾ããã
è¦ããã«ãã·ã³ãã«ãªæ¥æ¬¡ããããããããã ããªã®ã«ãç¶æãããã®ãå¤ããããã¨ããç¶æ ã«ãªã£ã¦ãã¾ãã¾ããã
ãBlob dump ãä¸å¿ã« DWH ãåç §ããççµåæ§æãã¨ããè¦ä»¶ã«å¯¾ãã¦å®è£ çµè·¯ãéã
ã¨ããã®ãæ£ç´ãªææã§ãããããã§ PoC ãçµã¦ ADF + SHIR ã廿¢ããdlt + Container App Job ã«ç½®ãæãã ã¨ãã夿ããã¾ããã
dlt ã¨ã¯ä½ã
dltï¼data load toolï¼ ã¯ãPythonã§æ¸ãããªã¼ãã³ã½ã¼ã¹ã® ETL ã©ã¤ãã©ãªã§ãã
ä¸è¨ã§è¨ã㨠ãPythonã§ãã¼ã¿ãã¤ãã©ã¤ã³ãããæãã«æ¸ãããã¼ã«ã ã§ãã
ç¹å¾´ãæããã¨ã
- ãã¼ã¿ã®ç§»åãèªã¿è¾¼ã¿ãã·ã³ãã«ã«è¡ãããªã¼ãã³ã½ã¼ã¹ã®Pythonã©ã¤ãã©ãªã
- ç¹å®ã®ããã¯ã¨ã³ãç°å¢ãå¿ é ã¨ããªã䏿¹ã§ãã³ã³ããç°å¢ä¸ã§ãPythonã³ã¼ãã¨ãã¦æè»ã«çµã¿è¾¼ãã§åãããã¨ãã§ãã
- Amazon S3ä¸ã®CSVãã¡ã¤ã«ãå種REST APIãããã¼ã¿ãèªã¿è¾¼ã¿ãDuckDBãSnowflakeã¨ãã£ãå ·ä½çãªãã¼ã¿ãã¼ã¹ããã¼ã¿ã¦ã§ã¢ãã¦ã¹ã¸ãæ´çããããã¼ã¿ã»ããã¨ãã¦ç´æ¥ä¿åãããã¨ãã§ãã
äºãæãããã¾ããä»åããããã£ããã¨ãã
- Azure Cosmos DB for PostgreSQLä¸ã®ãã¼ã¿ãParquetå½¢å¼ã§Blob Storageã«åºå
- ä¸è¨ã®å¦çãRead Replicaããæ¥æ¬¡ã§å®è¡
- Azure Container Appã使ãããã£ãã®ã§Docker Containeråãã
ã§ãã£ãã®ã§æ£ã«ãã£ã¦ã¤ãã®åå¨ã§ãã*1ã
ãªããä»åã®æ§æã§ã¯ã以ä¸ã®ã©ã¤ãã©ãªãçµã¿åããã¦ä½¿ãã¾ããã
| ã©ã¤ãã©ãª | å½¹å² |
|---|---|
dlt[filesystem,parquet,az] |
æ½åºã»ãã¼ãå¶å¾¡ã»Parquet åºåã»Azure Blob æ¸ã込㿠|
sqlalchemy + psycopg |
PostgreSQL æ¥ç¶ |
pyarrow |
Parquet 夿 |
adlfs |
Azure Blob Storage ãã¡ã¤ã«ã·ã¹ãã |
ã¢ã¼ããã¯ãã£æ¦è¦
å ¨ä½ã®æ§æã¯ä»¥ä¸ã®ã¨ããã§ãã

Container App Job ãæ¡ç¨ããã®ã¯ããæ¥æ¬¡ããããã¨ããè¦ä»¶ã«å¯¾ãã¦ãèµ·åâå®è¡âçµäºãã¨ãããµã¤ã¯ã«ãèªç¶ã«ã¯ã¾ãããã§ããSHIR ã®ããã«å¸¸æèµ·åããå¿ è¦ããªããå®è¡æã®ã¿ãªã½ã¼ã¹ãæ¶è²»ãã¾ããã³ã¹ãã»éç¨ã®ä¸¡é¢ã§ã·ã³ãã«ã§ãã
Blob ä¸ã®ãã¡ã¤ã«ã¬ã¤ã¢ã¦ãã¯ãã®ããã«ãªã£ã¦ãã¾ãã
snowflake-stage/
raw/
<table_name>/
<load_id>.<hash>.parquet â å®è¡åã«æ§ãã¡ã¤ã«åé¤âæ°è¦æ¸ãè¾¼ã¿
_dlt_loads/
_dlt_pipeline_state/
ãå¸¸ã«ææ°ã®å®å ¨ã³ãã¼ã ã Snowflake ããåç §ããã·ã³ãã«ãªè¨è¨ã§ãæ¥ä»ãã¼ãã£ã·ã§ã³ã¯æã¡ã¾ããã
ãã®æ§æã§ããæãã«åãããã¨ãã§ãã¾ããã
å®è£ ã®è©³ç´°ã¨ããããã¤ã³ã
ããã¯éç®±ã®é ãã¤ã¤ããããªTipsã§ãã
Cosmos DB for PostgreSQL ã® Read Replica ãç¸æã«ããã¨ç¬ç¹ã®ãããã©ãããããã¾ãã
NullPool ã¯å¿ é
dlt ã¯ããã©ã«ãã§è¤æ°ãã¼ãã«ã並å fetch ãã¾ããSQLAlchemy ã®ããã©ã«ãæ¥ç¶ãã¼ã«ã§ãã QueuePoolï¼size=5, overflow=10ï¼ã使ãã¨ã並åã³ãã¯ã·ã§ã³ä¸éãè¶
ãã¦ã¿ã¤ã ã¢ã¦ããçºçãã¾ãã
解決çï¼NullPool ã使ã
from sqlalchemy.pool import NullPool import sqlalchemy as sa engine = sa.create_engine(pg_url, poolclass=NullPool)
NullPool 㯠connect() ã®ãã³ã«ç¬ç«ããç©çã³ãã¯ã·ã§ã³ãçæããclose() ã§å³åæãã¾ãããããã¸ã§ãã§ã³ãã¯ã·ã§ã³åå©ç¨ãä¸è¦ãªå ´åããããä¸çªã·ã³ãã«ã§ç¢ºå®ã§ãã
Read Replica ã® WAL ç«¶ååé¡
次ã®ãããã©ããã¯ãServer-side Cursor ã¨ã® WALç«¶å *2ã§ãã
dlt + SQLAlchemy ã®çµã¿åããã§ã¯ã大éãã¼ã¿ã®ã¹ããªã¼ãã³ã°åå¾ã« stream_results=True ã使ããããªãã¾ãï¼psycopg3 ã® ServerCursor / DECLARE CURSOR ã«ç¸å½ï¼ãã¨ããããCosmos DB for PostgreSQLã® Read Replica ã§ããã使ã㨠SSL æ¥ç¶ãå¼·å¶åæããã¾ãã
OperationalError: SSL connection closed unexpectedly
åå 㯠Read Replica ã® WAL é©ç¨ã¨ã®ç«¶åã§ããããã¼ã¸ããµã¼ãã¹ã®å¶ç´ä¸ããã¥ã¼ãã³ã°ã§ãããã©ã¡ã¼ã¿ã«éãããããããã¯ã©ã¤ã¢ã³ãå´ï¼dltã®ã¢ããªã±ã¼ã·ã§ã³å ï¼ã§ã®åé¿ãå¿ è¦ã§ããã*3
解決çï¼yield_per ã®ã¿ã使ã
stmt = sa.select(table).execution_options(yield_per=10000)
yield_per ãæå®ãããã¨ã§ DECLARE CURSOR ã使ããã« fetchmany() ç¸å½ã®ãã£ã³ã¯åä½ã§çµæãåå¾ã§ãã¾ããWAL ç«¶åãåé¿ãã¤ã¤ãã¡ã¢ãªå¹çãä¿ã¦ã¾ãã
OOM 対çï¼ãã¼ãã«ãããåå² + åããã»ã¹å
yield_per ã使ã£ã¦ãããã¼ãã«å
¨ä½ãé æ¬¡èªã¿åãç¹ã¯å¤ããã¾ããããã¼ãã«ã大ããã£ããæ°ãå¤ãã£ããããã¨ãããã»ã¹ã®ã¡ã¢ãªãç©ã¿ä¸ãã£ã¦ OOMï¼Out of Memoryï¼ãçºçãã¾ãã
ãããé²ãããã«ã以ä¸ã®3ã¤ãçµã¿åããã¾ããã
1. workers=1ï¼åæå¦çãã¼ãã«æ°ã1ã«çµãï¼
pipeline.extract(source, write_disposition="replace", workers=1)
2. BATCH_COUNT ã«ããåå²å¦ç
å
¨ãã¼ãã«ã N åã®ãããã«åå²ãã¦é 次å¦çãã¾ããç°å¢å¤æ° BATCH_COUNTï¼ããã©ã«ã5ï¼ã§å¶å¾¡ãã¦ãã¾ãã
3. multiprocessing.spawn ã§åããã»ã¹å
ctx = multiprocessing.get_context("spawn")
proc = ctx.Process(target=run_batch, args=(batch,))
proc.start()
proc.join()
engine.dispose() ã¯ã³ãã¯ã·ã§ã³ã¯è§£æ¾ãã¾ãããPython ãã¼ãã¯è§£æ¾ãã¾ãããåããã»ã¹ã spawn ãã¦çµäºããããã¨ã§ãOS ã¬ãã«ã§ RAM ã確å®ã«åå ã§ãã¾ãã
forkã§ã¯ãªãspawnã使ãç¹ããã¤ã³ãã§ããforkã ã¨è¦ªããã»ã¹ã®ã¡ã¢ãªãå¼ãç¶ããã¦ãã¾ããããOOM 対çã¨ãã¦ä¸å®å ¨ã«ãªãã¾ãã
ãã£ã¦ã¿ã¦ã©ãã ã£ãã
ããããããã¨ãã·ã³ãã«ã«ã§ãããã¤ã³ãã©ä¸ã¯ï¼ãã ãdltã¯è¸ç´°ãããã¨ãã¦ããï¼ã ã¨ããã®ãçç´ãªææ³ã§ãã
ADF + SHIR ã®æ§æã§ã¯ãè¨å®ãå¤ããã¦ã©ãã«ä½ããããããããªããªããã¨ããç¶æ ã§ããããdlt ã§ã¯ Python ãã¡ã¤ã«ãè¦ãã°ä½ããã£ã¦ããããå ¨é¨ããã ã¨ããç¶æ ã«ãªãã¾ããã
䏿¹ã§ãåé ã§ã触ããéã CosmosDB for PostgreSQL ã® Read Replica ã«ã¯ç¬ç¹ã®å¶ç´ãããããããã¥ã¡ã³ããèªãã ãã§ã¯ããããªããã¬ãã«ã®ãããã©ãããããã¾ãããç¹ã« WAL ç«¶å㨠NullPool ã¯ãå®éã«åããã¦ã¨ã©ã¼ãè¦ãã¾ã§æ°ã¥ãã¾ããã§ããã
ããã¦ãCosmos DB for PostgreSQLã®å¶ç´ãåé¿ããããã®å®è£ ãè¨å®ãç ©éã«ãªã£ã¦ãã¾ã£ãã®ã¯åçã§ãããªãã¹ãããã¥ã¡ã³ãåããã¹ãã«ã¯è½ã¨ãã¦ãããã®ã®è² åµåããªãããã«ã±ã¢ããå¿ è¦ãããã¾ãã
çµã³
æ¬ããã°ã§ã¯ã
- dltï¼data load toolï¼ã¯ãPythonã§æ¸ãã ããã®ã·ã³ãã«ããå¼·ã¿ã SQLAlchemy ã¨çµã¿åãããã ãã§ PostgreSQL â Parquet â Azure Blob ãå®çµãã&Container App Jobã¨ã®ç¸æ§ãè¯ãã
- ADF + Self-hosted IR ã¯éç¨è¤éæ§ãé«ããPrivate æ¥ç¶ç°å¢ã§ã¯å°å ¥ã«è¦æãå¿ è¦ã
- Cosmos DB for PostgreSQLã® Read Replica çµç±ã®é£æºã«è¦æ³¨æã Server-side Cursorã¨ã® WAL ç«¶åãNullPool ã®å¿ è¦æ§ãOOM 対çãªã©ããã£ã¦ããæ°ãã¤ããã¤ã³ããè¤æ°ã
ã«ã¤ãã¦ã話ãã¾ããã
ãPython ã§æ¸ãã ETL ãæ¢ãã¦ããããADFãæ¡ç¨ãã¹ããå¦ãããCosmos DB for PostgreSQLã使ã£ããã¼ã¿åºç¤ãä½ããã¨ãã¦ãããã¨ããæ¹ã®åèã«ãªãã°å¬ããã§ãã
ãªããä»åã®ã±ã¼ã¹ã¯ããã¾ã§ãADFãCosmos DB for PostgreSQLã®ã¦ã¼ã¹ã±ã¼ã¹ã¨ã®ãã¿åãããæªãã£ããã¨ããã ãã§ããããµã¼ãã¹ã»è£½åã¨ãã¦ã®åªå£ã§ã¯ãªãäºã¯ä¸å¿è£è¶³ãã¦ããã¾ã*4ã
Ai Workforce SRE ãã¼ã ã§ã¯ãå¼ãç¶ããã¼ã¿åºç¤ã®æ´åãé²ãã¦ããã¾ãã
次ã®ã¹ãããã¨ã㦠Snowflakeç°å¢ã®æ§ç¯ãdbt ã«ããåæã¬ã¤ã¤ã¼ã®æ´åãæ§ãã¦ãããã¾ããã¿ãã§ãããããã°ã«æ¸ããã¨æãã¾ãã
ã¾ããSREããã³ãã¼ã¿ã¨ã³ã¸ãã¢åéä¸ã§ãã®ã§èå³ãããæ¹ã¯æ¯éã«ã¸ã¥ã¢ã«é¢è«ã§ãä½ã§ããã¾ãããï¼
æå¾ã¾ã§ãèªã¿ããã ããããã¨ããããã¾ããã
*1:ã²ã¨æåã§è¨ãã°Embulkã¿ãããªãã¨ãããããã£ãã¤ã¡ã¼ã¸ã§ãã
*2:Write-Ahead Logï¼ãã°å è¡æ¸ãè¾¼ã¿ï¼ã®ãã¨ã
*3:é常ã®PostgreSQLã§ããã° max_standby_streaming_delay çã§èª¿æ´ã§ãã¾ãããAzure Cosmos DB for PostgreSQL ã§ã¯ãããã®ãã©ã¡ã¼ã¿ãè¨å®ä¸å¯ã§ãã㤠master DB ã¸ã®å½±é¿ãä¸ããããªãå¶ç´ãããã¾ãã
*4:ADFã¯ä½¿ããã¨ãªãçµããã¾ããããCosmos DB for PostgreSQLèªä½ã¯Ai Workforceã®ç«ã¡ä¸ãæããããæãã«ä½¿ãã¦ããã®ã§è¯ããããã¯ãã ã¨æãã¾ãã