ããã»ã¹çæãšç®¡çã®åºæ¬ããå¿çšãŸã§
ã¯ããã«ïŒsubprocessã¢ãžã¥ãŒã«ãšã¯ïŒ ð€
Pythonã䜿ã£ãŠã·ã¹ãã 管çã¿ã¹ã¯ãä»ã®ããã°ã©ã ãšã®é£æºãè¡ãéãå€éšã®ã³ãã³ããããã»ã¹ãå®è¡ãããå Žé¢ã¯éåžžã«å€ããããŸããäŸãã°ãã·ã§ã«ã®ã³ãã³ããå®è¡ããŠãã¡ã€ã«æäœãè¡ã£ãããå¥ã®èšèªã§æžãããããã°ã©ã ãå®è¡ããŠãã®çµæãååŸãããããšãã£ãã±ãŒã¹ã§ãã
ãã®ãããªå€éšããã»ã¹ãšã®é£æºãå®çŸããããã«ãPythonã®æšæºã©ã€ãã©ãªãšããŠæäŸãããŠããã®ã subprocess
ã¢ãžã¥ãŒã«ã§ãããã®ã¢ãžã¥ãŒã«ã䜿ãããšã§ãæ°ããããã»ã¹ã®çæããã®ããã»ã¹ãžã®å
¥åãåºåã®ååŸãçµäºã¹ããŒã¿ã¹ã®ç¢ºèªãªã©ãç°¡åãã€å®å
šã«è¡ãããšãã§ããŸãã
ãã€ãŠã¯ os.system()
ã os.spawn*()
ãos.popen*()
ãšãã£ãé¢æ°ãå©çšãããŠããŸãããããããã¯æ©èœãéå®çã§ãã£ãããã»ãã¥ãªãã£äžã®åé¡ãæ±ããŠãããããŸãããsubprocess
ã¢ãžã¥ãŒã«ã¯ããããã®æ§æ¥ã®æ¹æ³ã眮ãæããããæè»ã§åŒ·åãªããã»ã¹ç®¡çæ©èœãæäŸããããã«å°å
¥ãããŸããïŒç¹ã«Python 2.4以éïŒã
ãã®ããã°èšäºã§ã¯ãsubprocess
ã¢ãžã¥ãŒã«ã®åºæ¬çãªäœ¿ãæ¹ãããæšæºå
¥åºåã®ãªãã€ã¬ã¯ãããšã©ãŒãã³ããªã³ã°ããã€ãã©ã€ã³åŠçãéåæå®è¡ããããŠéèŠãªã»ãã¥ãªãã£ã«é¢ãã泚æç¹ãŸã§ãå¹
åºãæãäžããŠè§£èª¬ããŠãããŸãããããsubprocess
ã®äžçãæ¢æ€ããŸãããïŒ ð
åºæ¬çãªäœ¿ãæ¹ïŒsubprocess.run() é¢æ° âš
subprocess
ã¢ãžã¥ãŒã«ã§æãæšå¥šãããäžè¬çã«äœ¿ãããã®ã subprocess.run()
é¢æ°ã§ããããã¯Python 3.5ã§å°å
¥ãããå€éšã³ãã³ããå®è¡ãããã®å®äºãåŸ
ã€ããã®ã·ã³ãã«ãã€åŒ·åãªã€ã³ã¿ãŒãã§ãŒã¹ãæäŸããŸãã
åºæ¬çãªæ§æã¯ä»¥äžã®ããã«ãªããŸãã
import subprocess
# ã³ãã³ããšåŒæ°ããªã¹ãã§æå®
result = subprocess.run(['ls', '-l'], capture_output=True, text=True)
# å®è¡çµæã®ç¢ºèª
print("Return code:", result.returncode)
print("Stdout:", result.stdout)
print("Stderr:", result.stderr)
äžèšã®äŸã§ã¯ãls -l
ã³ãã³ããå®è¡ããŠããŸããéèŠãªãã€ã³ãã¯ä»¥äžã®éãã§ãã
- ã³ãã³ããšåŒæ°: å®è¡ãããã³ãã³ããšãã®åŒæ°ã¯ãæååã®ãªã¹ãïŒãŸãã¯ã¿ãã«ïŒãšããŠ
run()
é¢æ°ã®ç¬¬äžåŒæ° (args
) ã«æž¡ãã®ãåºæ¬ã§ããããã¯åŸè¿°ããã»ãã¥ãªãã£ã®èŠ³ç¹ãããæšå¥šãããæ¹æ³ã§ãã capture_output=True
: ãã®åŒæ°ãæå®ãããšãã³ãã³ãã®æšæºåºå (stdout) ãšæšæºãšã©ãŒåºå (stderr) ããã£ããã£ãããæ»ãå€ã®ãªããžã§ã¯ãããã¢ã¯ã»ã¹ã§ããããã«ãªããŸããæå®ããªãå Žåãåºåã¯Pythonã¹ã¯ãªãããå®è¡ããŠããã¿ãŒããã«ãªã©ã«ãã®ãŸãŸè¡šç€ºãããçµæãªããžã§ã¯ãã«ã¯å«ãŸããŸãããtext=True
: Python 3.7以éã§å©çšå¯èœã§ããïŒãã以åã¯universal_newlines=True
ïŒããããæå®ãããšãstdin, stdout, stderr ãããã¹ãã¢ãŒãã§æ±ãããèªåçã«ã·ã¹ãã ã®ããã©ã«ããšã³ã³ãŒãã£ã³ã°ã§ãã³ãŒãã»ãšã³ã³ãŒããããŸããçµæãšããŠãresult.stdout
ãresult.stderr
ããã€ãå (bytes
) ã§ã¯ãªãæåå (str
) ã«ãªããŸããããã«ãããããã¹ãããŒã¿ã®æ±ããéåžžã«æ¥œã«ãªããŸã ðãæå®ããªãå Žåã¯ãã€ãåãšããŠæ±ãããŸãã- æ»ãå€:
run()
é¢æ°ã¯CompletedProcess
ãªããžã§ã¯ããè¿ããŸãããã®ãªããžã§ã¯ãã«ã¯ãå®è¡ãããã³ãã³ããåŒæ° (args
)ãçµäºã³ãŒã (returncode
)ããã£ããã£ãããæšæºåºå (stdout
)ãæšæºãšã©ãŒåºå (stderr
) ãªã©ãå«ãŸããŸãã - çµäºã³ãŒã (
returncode
): ã³ãã³ããæ£åžžã«çµäºããå Žåã¯éåžž0
ã«ãªããŸãããšã©ãŒãçºçããå Žåã¯0
以å€ã®å€ïŒéåžžã¯æ£ã®æŽæ°ïŒãè¿ãããŸãã
run() é¢æ°ã®äž»èŠãªåŒæ°
run()
é¢æ°ã«ã¯å€ãã®äŸ¿å©ãªåŒæ°ããããŸããããã€ãéèŠãªãã®ã玹ä»ããŸãã
åŒæ° | 説æ |
---|---|
args | å®è¡ããã³ãã³ããšåŒæ°ãéåžžã¯æååã®ãªã¹ããå¿ é ã |
stdin , stdout , stderr | åããã»ã¹ã®æšæºå
¥åãæšæºåºåãæšæºãšã©ãŒåºåã®åŠçæ¹æ³ãæå®ããŸããsubprocess.PIPE , subprocess.DEVNULL , ãã¡ã€ã«ãã£ã¹ã¯ãªãã¿ããã¡ã€ã«ãªããžã§ã¯ããªã©ãæå®ã§ããŸãã詳现ã¯åŸè¿°ã |
capture_output | True ã«ãããš stdout=subprocess.PIPE ãš stderr=subprocess.PIPE ãåæã«èšå®ããã®ãšåãå¹æããããŸãã |
text | True ã«ãããšãstdin, stdout, stderr ãããã¹ãã¢ãŒãã§åŠçãããŸãïŒãšã³ã³ãŒãã£ã³ã°ã¯ encoding ã errors åŒæ°ã§æå®å¯èœïŒããã€ãåã§ã¯ãªãæååãšããŠæ±ãããå Žåã«äŸ¿å©ã§ãã |
check | True ã«ãããšãã³ãã³ãã®çµäºã³ãŒãã 0 ã§ãªãå Žåã« CalledProcessError äŸå€ãçºçããŸãããšã©ãŒãã³ããªã³ã°ã«åœ¹ç«ã¡ãŸãã詳现ã¯åŸè¿°ã |
shell | True ã«ãããšãã³ãã³ããã·ã¹ãã ã®ã·ã§ã«ïŒäŸ: Linuxã® /bin/sh , Windowsã® cmd.exe ïŒãçµç±ããŠå®è¡ããŸããããã«ãããã·ã§ã«ã®æ©èœïŒãã€ããã¯ã€ã«ãã«ãŒããç°å¢å€æ°å±éãªã©ïŒãå©çšã§ããŸãããé倧ãªã»ãã¥ãªãã£ãªã¹ã¯ïŒã·ã§ã«ã€ã³ãžã§ã¯ã·ã§ã³ïŒã䌎ããŸããé垞㯠False ã®ãŸãŸïŒããã©ã«ãïŒã«ããã³ãã³ããšåŒæ°ã¯ãªã¹ãã§æž¡ãããšã匷ãæšå¥šãããŸãã詳现ã¯åŸè¿°ã |
cwd | ã³ãã³ããå®è¡ããã¯ãŒãã³ã°ãã£ã¬ã¯ããªãæå®ããŸããããã©ã«ãã¯èŠªããã»ã¹ïŒPythonã¹ã¯ãªããïŒã®ã«ã¬ã³ããã£ã¬ã¯ããªã§ãã |
timeout | ã³ãã³ãã®å®è¡æéå¶éïŒç§ïŒãæå®ããŸããæå®ããæéãè¶
ããŠãã³ãã³ããçµäºããªãå ŽåãTimeoutExpired äŸå€ãçºçããŸãã |
env | åããã»ã¹ã«èšå®ããç°å¢å€æ°ãèŸæžã§æå®ããŸããæå®ããªãå Žåã芪ããã»ã¹ã®ç°å¢å€æ°ãåŒãç¶ãããŸãã |
æšæºå ¥åºåã®ãªãã€ã¬ã¯ã ð
subprocess
ã®åŒ·åãªæ©èœã®äžã€ããåããã»ã¹ã®æšæºå
¥å (stdin)ãæšæºåºå (stdout)ãæšæºãšã©ãŒåºå (stderr) ãæè»ã«å¶åŸ¡ã§ããç¹ã§ãããã㯠run()
é¢æ°ã® stdin
, stdout
, stderr
åŒæ°ã䜿ã£ãŠè¡ããŸãã
æšæºåºå (stdout) ãšæšæºãšã©ãŒåºå (stderr) ã®ãã£ããã£
æ¢ã«èŠãããã«ãcapture_output=True
ãŸã㯠stdout=subprocess.PIPE
, stderr=subprocess.PIPE
ãæå®ããããšã§ãåããã»ã¹ã®åºåãååŸã§ããŸãã
import subprocess
# stdoutãšstderrããã£ããã£ããŠæååãšããŠååŸ
result = subprocess.run(['cat', 'non_existent_file.txt'], capture_output=True, text=True)
# stdoutã¯ç©ºã®ã¯ã
print(f"Stdout: '{result.stdout}'")
# stderrã«ãšã©ãŒã¡ãã»ãŒãžãå«ãŸãã
print(f"Stderr: '{result.stderr}'")
print(f"Return code: {result.returncode}") # 0以å€ã«ãªãã¯ã
æšæºå ¥å (stdin) ãžã®ããŒã¿éä¿¡
åããã»ã¹ã«ããŒã¿ãå
¥åãšããŠæž¡ãããå Žåã¯ãrun()
é¢æ°ã® input
åŒæ°ã䜿çšããŸãããã®å Žåãstdin=subprocess.PIPE
ãæ瀺çã«æå®ããå¿
èŠã¯ãããŸãã (input
ã䜿ããšèªåçã«èšå®ãããŸã)ã
input
ã«æž¡ãããŒã¿ã¯ãtext=True
ã®å Žåã¯æååãtext=False
(ããã©ã«ã) ã®å Žåã¯ãã€ãåã§ããå¿
èŠããããŸãã
import subprocess
# grepã³ãã³ãã«æšæºå
¥åããããŒã¿ãæž¡ã
input_data = "line1\nline2 with keyword\nline3\nline4 with keyword too"
result = subprocess.run(
['grep', 'keyword'],
input=input_data,
capture_output=True,
text=True # inputãtextãšããŠæ±ããã
)
print("Grep Return code:", result.returncode)
print("Grep Stdout:\n", result.stdout)
print("Grep Stderr:", result.stderr)
åºåã®ãªãã€ã¬ã¯ãå
stdout
ã stderr
ã«ã¯ subprocess.PIPE
以å€ãæå®ã§ããŸãã
subprocess.DEVNULL
: åºåãå®å šã«ç Žæ£ããŸãããã°ãªã©ãæå¶ãããå Žåã«äŸ¿å©ã§ãã- æ¢åã®ãã¡ã€ã«ãã£ã¹ã¯ãªãã¿: äŸãã°
sys.stdout
ãsys.stderr
ãæå®ãããšã芪ããã»ã¹ãšåãåºåå ã«æžãåºãããŸãïŒããã©ã«ãã®åäœã«è¿ãã§ãããæ瀺çã«æå®ããããšãå¯èœã§ãïŒã - ãã¡ã€ã«ãªããžã§ã¯ã:
open()
ã§éãããã¡ã€ã«ãªããžã§ã¯ããæå®ãããšããã®ãã¡ã€ã«ã«åºåãæžã蟌ãŸããŸãã
import subprocess
import sys
# stderrããã¡ã€ã«ã«ãªãã€ã¬ã¯ã
try:
with open('error.log', 'w') as f_err:
result = subprocess.run(
['cat', 'non_existent_file.txt'],
stdout=subprocess.PIPE, # stdoutã¯ãã£ããã£
stderr=f_err, # stderrã¯ãã¡ã€ã«ãž
text=True
)
print("Command executed.")
# error.logã確èªãããšãšã©ãŒã¡ãã»ãŒãžãæžã蟌ãŸããŠããã¯ã
except FileNotFoundError:
print("Error: 'cat' command not found.")
except Exception as e:
print(f"An error occurred: {e}")
# stdoutãç Žæ£ããstderrã¯èŠªããã»ã¹ãšåãã«ãã
try:
result_devnull = subprocess.run(
['ls', '/'],
stdout=subprocess.DEVNULL, # æšæºåºåã¯è¡šç€ºããªã
stderr=sys.stderr, # æšæºãšã©ãŒã¯ã¿ãŒããã«ã«åºå
text=True
)
print(f"\nls command return code: {result_devnull.returncode}")
except FileNotFoundError:
print("Error: 'ls' command not found.")
except Exception as e:
print(f"An error occurred: {e}")
stdout ãš stderr ã®ããŒãž
æšæºåºåãšæšæºãšã©ãŒåºåãåºå¥ãããåãã¹ããªãŒã ã§æ±ãããå ŽåããããŸãããã®å Žåã¯ãstderr=subprocess.STDOUT
ãæå®ããŸããããã«ãããstderr ã stdout ãšåããã³ãã«ã«ãªãã€ã¬ã¯ããããŸãã
import subprocess
# findã³ãã³ãã§ãèŠã€ãã£ããã¡ã€ã«(stdout)ãšæš©éãšã©ãŒ(stderr)ãããŒãžããŠãã£ããã£
try:
result = subprocess.run(
['find', '/', '-name', 'python*', '-print'], # '-print' ã¯ãªããŠãè¯ãå Žåãå€ã
stdout=subprocess.PIPE, # stdoutããã£ããã£
stderr=subprocess.STDOUT, # stderrãstdoutã«ããŒãž
text=True
)
print("Find Return code:", result.returncode) # ãšã©ãŒããã£ãŠã0ã«ãªãããšãããã®ã§æ³šæ
print("Combined Output:\n", result.stdout)
# result.stderr 㯠None ã«ãªã
print("Stderr:", result.stderr)
except FileNotFoundError:
print("Error: 'find' command not found.")
except Exception as e:
print(f"An error occurred: {e}")
ãšã©ãŒãã³ããªã³ã° ðš
å€éšã³ãã³ããå®è¡ããéã«ã¯ãæ§ã ãªãšã©ãŒãçºçããå¯èœæ§ããããŸããã³ãã³ããèŠã€ãããªããå®è¡æš©éããªããã³ãã³ãèªäœããšã©ãŒãè¿ããã¿ã€ã ã¢ãŠããããªã©ã§ãããããã®ãšã©ãŒã«é©åã«å¯ŸåŠããããšãéèŠã§ãã
çµäºã³ãŒãã®ç¢ºèª
æãåºæ¬çãªãšã©ãŒãã§ãã¯ã¯ãCompletedProcess
ãªããžã§ã¯ãã® returncode
å±æ§ã確èªããããšã§ããéåžžã0
ã¯æåãæå³ãããã以å€ã®å€ã¯ãšã©ãŒã瀺ããŸãã
import subprocess
result = subprocess.run(['false']) # 'false' ã³ãã³ãã¯åžžã«éãŒãã®çµäºã³ãŒããè¿ã
if result.returncode != 0:
print(f"Command failed with return code: {result.returncode}")
else:
print("Command succeeded.")
check=True ã«ããäŸå€çºç
ã³ãã³ãã倱æããå ŽåïŒçµäºã³ãŒãã 0
ã§ãªãå ŽåïŒã«ãèªåçã«äŸå€ãçºçããããå Žåã¯ãrun()
é¢æ°ã« check=True
ãæå®ããŸããããã«ãããCalledProcessError
äŸå€ãéåºãããŸãã
CalledProcessError
äŸå€ãªããžã§ã¯ãã¯ãreturncode
, cmd
, output
(stdout
), stderr
å±æ§ãæã£ãŠããããšã©ãŒã®è©³çŽ°ã確èªã§ããŸãã
import subprocess
try:
# ååšããªããã¡ã€ã«ãæå®ããŠãšã©ãŒãçºçããã
result = subprocess.run(['ls', 'non_existent_directory'], check=True, capture_output=True, text=True)
print("Command succeeded (should not happen).")
print("Stdout:", result.stdout)
except FileNotFoundError as e:
print(f"Error: Command not found - {e}") # lsã³ãã³ãèªäœãèŠã€ãããªãå Žå
except subprocess.CalledProcessError as e:
print(f"Command failed!")
print(f" Command: {e.cmd}")
print(f" Return code: {e.returncode}")
print(f" Stdout: {e.stdout}") # capture_output=Trueãªããã£ããã£ãããŠãã
print(f" Stderr: {e.stderr}") # capture_output=Trueãªããã£ããã£ãããŠãã
except Exception as e:
print(f"An unexpected error occurred: {e}")
check=True
ã¯ãã³ãã³ããæåããããšãåæãšããŠããå Žåã«ã³ãŒããç°¡æœã«ããã®ã«åœ¹ç«ã¡ãŸãããšã©ãŒåŠçãæ瀺çã«è¡ãããå Žåã¯ãtry...except
ãããã¯ã§ CalledProcessError
ãææããŸãã
ã³ãã³ããèŠã€ãããªãå Žåã®ãšã©ãŒ
å®è¡ããããšããã³ãã³ãèªäœãååšããªãå ŽåãFileNotFoundError
ãçºçããŸããããã try...except
ã§ææããå¿
èŠããããŸãã
import subprocess
try:
# ååšããªãã§ãããã³ãã³ããå®è¡
subprocess.run(['non_existent_command_blah_blah'], check=True)
except FileNotFoundError as e:
print(f"Error: The command was not found: {e}")
except subprocess.CalledProcessError as e:
print(f"Command failed with return code: {e.returncode}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
ã¿ã€ã ã¢ãŠãåŠç
å€éšã³ãã³ããäºæããé·æéå®è¡ãããããã°ã©ã å
šäœãåæ¢ããŠããŸãã®ãé²ãããã«ãtimeout
åŒæ°ã䜿çšã§ããŸããæå®ããç§æ°å
ã«ã³ãã³ããçµäºããªãå ŽåãTimeoutExpired
äŸå€ãçºçããŸãã
import subprocess
try:
# 5ç§éã¹ãªãŒãããã³ãã³ãã3ç§ã®ã¿ã€ã ã¢ãŠãã§å®è¡
result = subprocess.run(['sleep', '5'], timeout=3)
print("Command finished within timeout (should not happen).")
except FileNotFoundError:
print("Error: 'sleep' command not found.")
except subprocess.TimeoutExpired as e:
print(f"Command timed out after {e.timeout} seconds!")
print(f" Command: {e.cmd}")
# ã¿ã€ã ã¢ãŠãããå Žåã§ããstdoutãstderrã«éšåçãªåºåãå«ãŸããŠããå¯èœæ§ããã
print(f" Stdout (partial): {e.stdout}")
print(f" Stderr (partial): {e.stderr}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
TimeoutExpired
äŸå€ãçºçããå Žåãåããã»ã¹ã¯ SIGKILL
ã·ã°ãã«ã§åŒ·å¶çµäºãããŸããäŸå€ãªããžã§ã¯ãã«ã¯ãã¿ã€ã ã¢ãŠãæç¹ãŸã§ã«ãã£ããã£ããã stdout
ã stderr
(ãã€ãå) ãå«ãŸããŠããããšããããŸãã
ãã€ãã©ã€ã³åŠç ð
ã·ã§ã«ã§ã¯ãããã³ãã³ãã®åºåãå¥ã®ã³ãã³ãã®å
¥åã«ç¹ãããã€ãã©ã€ã³ã(|
) ããã䜿ãããŸããsubprocess
ã¢ãžã¥ãŒã«ã䜿ã£ãŠããåæ§ã®åŠçãå®çŸã§ããŸããããã«ã¯ã䞻㫠subprocess.Popen
ã¯ã©ã¹ã䜿çšããŸãïŒrun()
ã§ãå¯èœã§ãããPopen
ã®æ¹ãããçŽæ¥çã§ãïŒã
Popen
㯠run()
ãšç°ãªããããã»ã¹ãéå§ããçŽåŸã«å¶åŸ¡ãè¿ããããã»ã¹ã®å®äºãåŸ
ã¡ãŸããïŒéåæçïŒãããã»ã¹ã®å®äºãåŸ
ã£ãããå
¥åºåãåŠçãããããã«ã¯ãè¿œå ã®ã¡ãœããåŒã³åºããå¿
èŠã§ãã
äŸãšããŠãls -l
ã®åºåã grep python
ã«ãã€ãã§ç¹ãåŠçãèããŠã¿ãŸãããã
import subprocess
try:
# 第äžã®ããã»ã¹ (ls -l) ãéå§
p1 = subprocess.Popen(['ls', '-l'], stdout=subprocess.PIPE)
# 第äºã®ããã»ã¹ (grep python) ãéå§
# stdinã«ã¯ç¬¬äžã®ããã»ã¹ã®stdoutãæå®
p2 = subprocess.Popen(['grep', 'python'], stdin=p1.stdout, stdout=subprocess.PIPE, text=True)
# 第äžã®ããã»ã¹ã®stdoutãéããïŒp2ãEOFãåãåããããã«ïŒ
# ãããè¡ããªããšãp2ãp1ã®çµäºãåŸ
ã¡ç¶ããŠãããããã¯ããå¯èœæ§ããã
if p1.stdout:
p1.stdout.close()
# 第äºã®ããã»ã¹ã®åºåãååŸ
# communicate() ã¯ããã»ã¹ã®å®äºãåŸ
ã¡ãstdoutãšstderrãè¿ã
stdout, stderr = p2.communicate()
print("Pipeline Output:")
print(stdout)
# åããã»ã¹ã®çµäºã³ãŒãã確èªã§ãã (ãªãã·ã§ã³)
# communicate() ã®åŸã§ãªããã° wait() ã¯äžèŠãªããšãå€ã
# p1_rc = p1.wait()
p2_rc = p2.wait()
print(f"\nls return code: {p1.returncode}") # Popenãªããžã§ã¯ãã¯å®äºåŸãreturncodeãæã€
print(f"grep return code: {p2_rc}")
except FileNotFoundError as e:
print(f"Error: Command not found - {e}")
except Exception as e:
print(f"An error occurred: {e}")
ãã®äŸã®ãã€ã³ãã¯ä»¥äžã®éãã§ãã
Popen
ã䜿ã£ãŠåããã»ã¹ãéå§ããŸãã- æåã®ããã»ã¹ (
p1
) ã®stdout
ãsubprocess.PIPE
ã«èšå®ããŸãã - 次ã®ããã»ã¹ (
p2
) ã®stdin
ã«ãåã®ããã»ã¹ (p1
) ã®stdout
ãªããžã§ã¯ã (p1.stdout
) ãæå®ããŸãã p2
ãéå§ããåŸãp1.stdout.close()
ãåŒã³åºãããšãéèŠã§ããããã«ãããp1
ãçµäºããéã«ãã€ããéããããp2
ãå ¥åã®çµãã (EOF) ãæ€ç¥ã§ããããã«ãªããŸãããããå¿ãããšãp2
ãå ¥åãåŸ ã¡ç¶ããŠãããããã¯ããå¯èœæ§ããããŸãã- æåŸã®ããã»ã¹ã®
communicate()
ã¡ãœãããåŒã³åºããŠãæçµçãªåºåãååŸããããã»ã¹ã®å®äºãåŸ ã¡ãŸããcommunicate()
ã¯å éšã§å¿ èŠãªåŸ æ©åŠçãè¡ããããéåžžã¯åå¥ã«wait()
ãåŒã¶å¿ èŠã¯ãããŸããã
ãªããsubprocess.run()
ã§ããinput
åŒæ°ã䜿ã£ãŠãã€ãã©ã€ã³ã®ãããªåŠçãæš¡å£ã§ããŸãããäžéããã»ã¹ã®å¶åŸ¡ããšã©ãŒãã³ããªã³ã°ã¯ Popen
ã䜿ãæ¹ãæè»ã§ãã
import subprocess
try:
# ls -l ã®åºåãååŸ
p1_result = subprocess.run(['ls', '-l'], capture_output=True, text=True, check=True)
# grep ã« ls ã®åºåã input ãšããŠæž¡ã
p2_result = subprocess.run(
['grep', 'python'],
input=p1_result.stdout, # åã®ã³ãã³ãã®stdoutãæž¡ã
capture_output=True,
text=True,
check=True # grepãäœãèŠã€ããããªããŠããšã©ãŒã«ã¯ããªãå Žåã¯Falseã«ãã
)
print("Pipeline Output (using run):")
print(p2_result.stdout)
except FileNotFoundError as e:
print(f"Error: Command not found - {e}")
except subprocess.CalledProcessError as e:
print(f"Command failed in pipeline!")
print(f" Command: {e.cmd}")
print(f" Return code: {e.returncode}")
print(f" Stderr: {e.stderr}")
except Exception as e:
print(f"An error occurred: {e}")
éåæåŠçïŒPopenã¯ã©ã¹ã®æŽ»çš â³
subprocess.run()
ã¯ã³ãã³ããå®äºãããŸã§åŸ
æ©ããåæçãªé¢æ°ã§ããããããæéããããå¯èœæ§ã®ããã³ãã³ããå®è¡ãã€ã€ãPythonã¹ã¯ãªããåŽã§ä»ã®åŠçãç¶ãããå Žåããè€æ°ã®ããã»ã¹ã䞊è¡ããŠå®è¡ãããå ŽåããããŸãããã®ãããªéåæçãªåŠçã«ã¯ subprocess.Popen
ã¯ã©ã¹ã䜿çšããŸãã
Popen
㯠run()
ãšåæ§ã®åŒæ°ãå€ãåããŸãããã€ã³ã¹ã¿ã³ã¹åãããšããã«åããã»ã¹ãéå§ããå¶åŸ¡ãè¿ããŸãã
import subprocess
import time
print("Starting long-running process...")
# 5ç§ãããããã»ã¹ãéå§ (ããã¯ã°ã©ãŠã³ãã§å®è¡ããã)
process = subprocess.Popen(['sleep', '5'])
print(f"Process started with PID: {process.pid}. Continuing with other tasks...")
# ä»ã®åŠçãå®è¡
for i in range(3):
print(f"Doing other work... ({i+1}/3)")
time.sleep(1)
print("Checking if process has finished...")
# poll() ã¯ããã»ã¹ãçµäºããŠããã°çµäºã³ãŒããããŠããªããã° None ãè¿ã (ãããã¯ããªã)
return_code = process.poll()
if return_code is None:
print("Process is still running.")
else:
print(f"Process finished with return code: {return_code}")
# ããã»ã¹ã®å®äºãåŸ
ã€å Žå㯠wait() ã䜿ã
print("Waiting for process to complete...")
# wait() ã¯ããã»ã¹ãçµäºãããŸã§ãããã¯ããçµäºã³ãŒããè¿ã
final_return_code = process.wait()
print(f"Process completed with return code: {final_return_code}")
# å床 poll() ãåŒã¶ãšãçµäºã³ãŒããè¿ã
print(f"Return code after wait(): {process.poll()}")
Popenãªããžã§ã¯ãã®ã¡ãœãã
Popen
ãªããžã§ã¯ãã«ã¯ãéå§ããããã»ã¹ãå¶åŸ¡ããããã®ã¡ãœãããçšæãããŠããŸãã
ã¡ãœãã | 説æ |
---|---|
poll() | ããã»ã¹ã®ç¶æ
ã確èªããŸããããã»ã¹ãçµäºããŠããã°çµäºã³ãŒããè¿ãããŸã å®è¡äžã§ããã° None ãè¿ããŸãããã®ã¡ãœããã¯ãããã¯ããŸããã |
wait(timeout=None) | ããã»ã¹ãçµäºãããŸã§åŸ
æ©ããçµäºã³ãŒããè¿ããŸããtimeout ãæå®ãããšãæå®ç§æ°åŸ
æ©ããŠãçµäºããªãå Žåã« TimeoutExpired äŸå€ãçºçããŸãã |
communicate(input=None, timeout=None) | ããã»ã¹ãšå¯Ÿè©±ããŸããinput (ãã€ãåãŸã㯠text=True ãªãæåå) ãããã»ã¹ã®æšæºå
¥åã«éããæšæºåºåãšæšæºãšã©ãŒåºåãããã¹ãŠã®ããŒã¿ãèªã¿åã£ãŠãããã»ã¹ã®å®äºãåŸ
ã¡ãŸããæ»ãå€ã¯ (stdout_data, stderr_data) ã®ã¿ãã«ã§ãããã€ãã䜿çšããå Žåããããããã¯ãé¿ããããã« communicate() ã䜿ãã®ãæãå®å
šã§ç°¡åã§ããtimeout ãæå®ã§ããŸãã |
send_signal(signal) | ããã»ã¹ã«ã·ã°ãã«ãéããŸãïŒäŸ: signal.SIGTERM , signal.SIGKILL ïŒãUnixç³»ã·ã¹ãã ã§ã®ã¿æå¹ãªå ŽåããããŸãã |
terminate() | ããã»ã¹ã«çµäºã·ã°ãã« (SIGTERM ) ãéããŸããããã»ã¹ãæ£åžžã«çµäºåŠçãè¡ãæ©äŒãäžããŸãã |
kill() | ããã»ã¹ã«åŒ·å¶çµäºã·ã°ãã« (SIGKILL ) ãéããŸããããã»ã¹ã¯å³åº§ã«çµäºãããããŸããé垞㯠terminate() ã§çµäºããªãå Žåã®æçµæ段ãšããŠäœ¿ããŸãã |
communicate() ã®æ³šæç¹
communicate()
ã¯éåžžã«äŸ¿å©ã§ããã泚æç¹ããããŸãããã®ã¡ãœããã¯ãããã»ã¹ã® stdout ãš stderr ãããã¹ãŠã®åºåãã¡ã¢ãªã«èªã¿èŸŒãããšããŸãããã®ãããéåžžã«å€§éã®åºåãçæããããã»ã¹ã«å¯ŸããŠäœ¿çšãããšãã¡ã¢ãªã倧éã«æ¶è²»ããŠããŸãå¯èœæ§ããããŸãããã®ãããªå Žåã¯ãPopen
ãªããžã§ã¯ãã® stdout
ã stderr
å±æ§ïŒãã¡ã€ã«ã©ã€ã¯ãªããžã§ã¯ãïŒãçŽæ¥ãå°ããã€èªã¿æžãããé«åºŠãªåŠçãå¿
èŠã«ãªãããšããããŸãã
import subprocess
# 倧éã®åºåãçæããäŸ (ããã§ã¯ 'yes' ã³ãã³ãã䜿çš)
# 泚æ: 'yes' ã¯ç¡éã«åºåãç¶ãããããã¿ã€ã ã¢ãŠããæååæ¢ãå¿
èŠã§ã
try:
process = subprocess.Popen(['yes'], stdout=subprocess.PIPE, text=True)
print(f"Started 'yes' process (PID: {process.pid}). Reading first 10 lines...")
line_count = 0
# stdoutããçŽæ¥èªã¿åã (ãªã¢ã«ã¿ã€ã åŠç)
if process.stdout:
for line in process.stdout:
print(line.strip()) # strip() ã§æ¹è¡ãé€å»
line_count += 1
if line_count >= 10:
break # 10è¡èªãã ãã«ãŒããæãã
print("\nTerminating the process...")
process.terminate() # ããã»ã¹ãçµäºããã
# çµäºãç¢ºèª (å°ãåŸ
ã€)
try:
process.wait(timeout=1)
print(f"Process terminated with code: {process.returncode}")
except subprocess.TimeoutExpired:
print("Process did not terminate, killing...")
process.kill() # 匷å¶çµäº
process.wait() # killã®åŸãwaitã§çµäºãåŸ
ã€ã®ã確å®
print(f"Process killed with code: {process.returncode}")
except FileNotFoundError:
print("Error: 'yes' command not found.")
except Exception as e:
print(f"An error occurred: {e}")
ãã®ããã« Popen
ãšãã®ã¡ãœãããçµã¿åãããããšã§ãå€éšããã»ã¹ãããæè»ã«ãéåæçã«å¶åŸ¡ããããšãå¯èœã«ãªããŸãã
ã»ãã¥ãªãã£ã«é¢ãã泚æç¹ ð¡ïž
subprocess
ã¢ãžã¥ãŒã«ã¯éåžžã«åŒ·åã§ããã䜿ãæ¹ã誀ããšé倧ãªã»ãã¥ãªãã£ãªã¹ã¯ãåŒãèµ·ããå¯èœæ§ããããŸããç¹ã«æ³šæãå¿
èŠãªã®ã shell=True
ãªãã·ã§ã³ã®äœ¿çšã§ãã
ã·ã§ã«ã€ã³ãžã§ã¯ã·ã§ã³ã®å±éºæ§ (shell=True)
shell=True
ãæå®ãããšãæž¡ãããã³ãã³ãæååãã·ã¹ãã ã®ã·ã§ã«ã«ãã£ãŠè§£éã»å®è¡ãããŸããããã¯ãã·ã§ã«ã®äŸ¿å©ãªæ©èœïŒãã€ã |
ããªãã€ã¬ã¯ã >
ãã³ãã³ãé£çµ ;
ã &&
ãªã©ïŒã䜿ãããå Žåã«äžèŠäŸ¿å©ã«èŠããŸãã
ããããå€éšããã®å
¥åïŒãŠãŒã¶ãŒå
¥åããã¡ã€ã«ã®å
容ããããã¯ãŒã¯ããã®ããŒã¿ãªã©ïŒãã³ãã³ãæååã«å«ã㊠shell=True
ã§å®è¡ãããšãæªæã®ããã³ãŒããå®è¡ããããã·ã§ã«ã€ã³ãžã§ã¯ã·ã§ã³ãè匱æ§ã«ç¹ããå¯èœæ§ããããŸãã
äŸãã°ããŠãŒã¶ãŒã«å ¥åããããã¡ã€ã«åãåé€ããã€ããã®ã³ãŒããèããŠã¿ãŸãããã
import subprocess
# !!! å±éºãªã³ãŒãäŸ !!!
filename = input("åé€ãããã¡ã€ã«åãå
¥åããŠãã ãã: ")
# ãŠãŒã¶ãŒå
¥åããã®ãŸãŸã³ãã³ãæååã«åã蟌ãã§ãã
command = f"rm {filename}"
print(f"å®è¡ããããšããŠããã³ãã³ã: '{command}'")
# shell=True ã§å®è¡
try:
# ãã®åœ¢åŒã¯çµ¶å¯Ÿã«é¿ããã¹ãïŒ
subprocess.run(command, shell=True, check=True)
print(f"ãã¡ã€ã« '{filename}' ãåé€ããŸããã")
except FileNotFoundError:
print("Error: 'rm' command not found.")
except subprocess.CalledProcessError as e:
print(f"ã³ãã³ãã®å®è¡ã«å€±æããŸãã: {e}")
except Exception as e:
print(f"äºæãã¬ãšã©ãŒ: {e}")
ãã®ã³ãŒãã§ãããæªæã®ãããŠãŒã¶ãŒããã¡ã€ã«åãšã㊠my_file.txt; rm -rf /
ã®ãããªæååãå
¥åãããã©ããªãã§ããããïŒ ã·ã§ã«ã¯ããã2ã€ã®ã³ãã³ããšããŠè§£éããŸãã
rm my_file.txt
rm -rf /
çµæãšããŠãæå³ããªãã³ãã³ã (rm -rf /
) ãå®è¡ãããã·ã¹ãã ã«å£æ»
çãªãã¡ãŒãžãäžããå¯èœæ§ããããŸã ð±ã
å®å šãªä»£æ¿çïŒåŒæ°ãªã¹ãã®äœ¿çš (shell=False)
ã·ã§ã«ã€ã³ãžã§ã¯ã·ã§ã³ãé²ãããã®æãéèŠãªå¯Ÿçã¯ãshell=True
ãé¿ããã³ãã³ããšåŒæ°ãæååã®ãªã¹ããšããŠæž¡ãããšã§ããããã subprocess
ã¢ãžã¥ãŒã«ã®ããã©ã«ã (shell=False
) ã§ãããæšå¥šãããæ¹æ³ã§ãã
import subprocess
import shlex # ã·ã§ã«ã©ã€ã¯ãªåå²ã«äœ¿ãã
# å®å
šãªã³ãŒãäŸ
filename = input("åé€ãããã¡ã€ã«åãå
¥åããŠãã ãã: ")
# ãŠãŒã¶ãŒå
¥åã¯åãªãåŒæ°ãšããŠæ±ããã
command_list = ['rm', filename]
print(f"å®è¡ããããšããŠããã³ãã³ããªã¹ã: {command_list}")
try:
# shell=False (ããã©ã«ã) ã§ãªã¹ããšããŠæž¡ã
subprocess.run(command_list, check=True)
print(f"ãã¡ã€ã« '{filename}' ãåé€ããŸããã")
except FileNotFoundError:
# 'rm' ã³ãã³ãèªäœãèŠã€ãããªãããæå®ããããã¡ã€ã«åãäžæ£ãªå Žå
# (äŸ: ';' ãå«ããã¡ã€ã«åã¯éåžžãªãã®ã§ããã®ãããªå
¥åã¯åŒæ°ãšããŠæž¡ããã
# 'rm' ãããã®ãããªãã¡ã€ã«ã¯ãªãããšãããšã©ãŒãåºãå¯èœæ§ãé«ã)
print(f"Error: 'rm' command not found or invalid argument '{filename}'")
except subprocess.CalledProcessError as e:
# rm ã³ãã³ãã倱æããå Žå (ãã¡ã€ã«ãååšããªããæš©éããªããªã©)
print(f"ã³ãã³ãã®å®è¡ã«å€±æããŸãã (Stderr: {e.stderr})")
except Exception as e:
print(f"äºæãã¬ãšã©ãŒ: {e}")
# ãããŠãŒã¶ãŒå
¥åã«ç©ºçœãªã©ãå«ãŸããå¯èœæ§ãããå Žåããã®ãŸãŸãªã¹ãã«å
¥ããã®ãå®å
š
# äŸ: filename = "my file with spaces.txt" -> command_list = ['rm', 'my file with spaces.txt']
# ã·ã§ã«ã解éããªãã®ã§ããã¡ã€ã«åãæ£ãã 'rm' ã³ãã³ãã«æž¡ããã
# ã©ãããŠãæååããå®å
šã«åå²ãããå Žå㯠shlex.split() ã䜿ã
# user_input = "rm 'my file with spaces.txt'"
# command_list = shlex.split(user_input) # -> ['rm', 'my file with spaces.txt']
# subprocess.run(command_list, check=True)
# ãã ãããŠãŒã¶ãŒå
¥åã shlex.split ã«ãããã®ã¯æšå¥šãããªãããªã¹ãã§çŽæ¥çµã¿ç«ãŠãã¹ãã
shell=False
ã®å Žåãsubprocess
ã¯ã·ã§ã«ãä»ããã«çŽæ¥OSã®ã·ã¹ãã ã³ãŒã«ã䜿ã£ãŠããã»ã¹ãçæããŸãããªã¹ãã®åèŠçŽ ã¯ãOSã«å¯ŸããŠããããã³ãã³ãåããããã第äžåŒæ°ããããã第äºåŒæ°ããšããããã«æ確ã«æž¡ãããŸãããã®ãããåŒæ°ã«å«ãŸããç¹æ®æåïŒ;
, |
, &
, >
ãªã©ïŒã¯ã·ã§ã«ã®ã¡ã¿æåãšããŠè§£éããããåãªãæååãšããŠã³ãã³ãã«æž¡ãããŸããããã«ãããã·ã§ã«ã€ã³ãžã§ã¯ã·ã§ã³ã®ãªã¹ã¯ãæ ¹æ¬çã«æé€ã§ããŸã ðã
çµè«ãšããŠãå€éšããã®å
¥åãã³ãã³ãã®äžéšãšããŠäœ¿çšããå Žåã¯ã絶察㫠shell=True
ã䜿ãããã³ãã³ããšåŒæ°ããªã¹ã圢åŒã§æž¡ããŠãã ããã ã·ã§ã«ã®æ©èœãå¿
èŠãªå Žåã§ããPythonåŽã§ãã€ãã©ã€ã³åŠçãå®è£
ãããªã©ãä»ã®æ¹æ³ãæ€èšãã¹ãã§ãã
ãã®ä»ã®ã»ãã¥ãªãã£èæ ®äºé
- ã³ãã³ãã®ãã¹: ã³ãã³ãåã絶察ãã¹ã§æå®ããããä¿¡é Œã§ãã
PATH
ç°å¢å€æ°ãèšå®ããããšã§ãæå³ããªãã³ãã³ããå®è¡ããããªã¹ã¯ãæžãããŸãã - å®è¡æš©é: ã¹ã¯ãªãããå®è¡ãããŠãŒã¶ãŒã®æš©éãå¿ èŠæå°éã§ããããšã確èªããŠãã ããã
- å ¥åºåã®ãµãã¿ã€ãº: å€éšããã»ã¹ãšã®éã§ããåãããããŒã¿ïŒç¹ã«å€éšããã®å ¥åã«åºã¥ãå ŽåïŒã¯ãé©åã«æ€èšŒã»ãµãã¿ã€ãºããããšãæãŸããã§ãã
shell=True
ã®å©çšã¯ããã®å±éºæ§ãååã«ç解ããå
¥åãå®å
šã«ä¿¡é Œã§ãããšç¢ºä¿¡ã§ããå Žåã«éå®ãã¹ãã§ããå€ãã®å Žåãããå®å
šãªä»£æ¿æ段ãååšããŸããå€ãé¢æ°ã«ã€ã㊠(call, check_call, check_output) ð°ïž
subprocess.run()
ãå°å
¥ãããåïŒPython 3.5ããåïŒã¯ã以äžã®é¢æ°ããã䜿ãããŠããŸãããçŸåšã§ãå©çšå¯èœã§ãããå€ãã®å Žå run()
ã䜿ãæ¹ãåŒæ°ãçµ±äžãããŠãããæè»æ§ãé«ãããæšå¥šãããŸãã
å€ãé¢æ° | æŠèŠ | çžåœãã run() ã®äœ¿ãæ¹ |
---|---|---|
subprocess.call(...) | ã³ãã³ããå®è¡ããå®äºãåŸ ã£ãŠçµäºã³ãŒããè¿ããæšæºåºåããšã©ãŒåºåã¯ãã£ããã£ãããªãïŒèŠªããã»ã¹ã«ãã®ãŸãŸæµããïŒã | subprocess.run(...).returncode |
subprocess.check_call(...) | call() ãšåæ§ã ããçµäºã³ãŒãã 0 ã§ãªãå Žåã« CalledProcessError ãéåºããã | subprocess.run(..., check=True) |
subprocess.check_output(...) | ã³ãã³ããå®è¡ããå®äºãåŸ
ã€ãçµäºã³ãŒãã 0 ã§ãªãå Žå㯠CalledProcessError ãéåºãããæåããå Žåãæšæºåºåããã€ãåãšããŠè¿ããæšæºãšã©ãŒåºåã¯ãã£ããã£ãããªãã | subprocess.run(..., check=True, capture_output=True).stdout ãŸã㯠subprocess.run(..., check=True, stdout=subprocess.PIPE).stdout |
ãããã®å€ãé¢æ°ã¯ãrun()
é¢æ°ã®ç¹å®ã®ãŠãŒã¹ã±ãŒã¹ãç°¡æœã«æžãããã«ååšããŠããŸãããããããrun()
é¢æ°ã¯ãããã®æ©èœããã¹ãŠã«ããŒããããã«å€ãã®ãªãã·ã§ã³ïŒtext
, input
, timeout
, stderrã®ãã£ããã£ãªã©ïŒãæäŸããŸãã
ç¹å¥ãªçç±ããªãéããæ°èŠã®ã³ãŒãã§ã¯ subprocess.run()
ã䜿çšããå¿
èŠã«å¿ã㊠Popen
ãå©çšããã®ãè¯ãã§ãããã
ãããããŠãŒã¹ã±ãŒã¹ ð ïž
subprocess
ã¢ãžã¥ãŒã«ã¯æ§ã
ãªå Žé¢ã§æŽ»çšã§ããŸããããã§ã¯ããã€ãã®å
·äœçãªãŠãŒã¹ã±ãŒã¹ãèŠãŠã¿ãŸãããã
1. ã·ã¹ãã æ å ±ã®ååŸ
OSãæäŸããã³ãã³ãã䜿ã£ãŠã·ã¹ãã æ å ±ãååŸããŸãã
import subprocess
import platform
def get_system_info():
info = {}
system = platform.system()
info['OS'] = f"{system} {platform.release()}"
try:
if system == "Linux":
# CPUæ
å ± (lscpu)
result = subprocess.run(['lscpu'], capture_output=True, text=True, check=True)
# ç°¡åãªæœåº (ããæ£ç¢ºã«ã¯æ£èŠè¡šçŸãªã©ã䜿ã)
for line in result.stdout.splitlines():
if line.startswith("Model name:"):
info['CPU'] = line.split(":", 1)[1].strip()
break
# ã¡ã¢ãªæ
å ± (free -h)
result = subprocess.run(['free', '-h'], capture_output=True, text=True, check=True)
lines = result.stdout.splitlines()
if len(lines) > 1:
# ãããè¡ã®æ¬¡ã®è¡ (Mem:) ã® total, used, free ãååŸ
mem_parts = lines[1].split()
if len(mem_parts) >= 4:
info['Memory (Total/Used/Free)'] = f"{mem_parts[1]} / {mem_parts[2]} / {mem_parts[3]}"
elif system == "Darwin": # macOS
# CPUæ
å ± (sysctl)
result = subprocess.run(['sysctl', '-n', 'machdep.cpu.brand_string'], capture_output=True, text=True, check=True)
info['CPU'] = result.stdout.strip()
# ã¡ã¢ãªæ
å ± (sysctl) - ç©çã¡ã¢ãªãµã€ãº
result = subprocess.run(['sysctl', '-n', 'hw.memsize'], capture_output=True, text=True, check=True)
mem_bytes = int(result.stdout.strip())
info['Memory (Total)'] = f"{mem_bytes / (1024**3):.2f} GiB" # GiBã«å€æ
elif system == "Windows":
# CPUæ
å ± (wmic)
result = subprocess.run(['wmic', 'cpu', 'get', 'Name'], capture_output=True, text=True, check=True, encoding='cp932') # Windowsã¯ãšã³ã³ãŒãã£ã³ã°æ³šæ
lines = result.stdout.splitlines()
if len(lines) > 1:
info['CPU'] = lines[1].strip()
# ã¡ã¢ãªæ
å ± (wmic)
result = subprocess.run(['wmic', 'ComputerSystem', 'get', 'TotalPhysicalMemory'], capture_output=True, text=True, check=True, encoding='cp932')
lines = result.stdout.splitlines()
if len(lines) > 1:
mem_bytes = int(lines[1].strip())
info['Memory (Total)'] = f"{mem_bytes / (1024**3):.2f} GiB"
except FileNotFoundError as e:
print(f"Required command not found: {e}")
except subprocess.CalledProcessError as e:
print(f"Command failed: {e.cmd}, Stderr: {e.stderr}")
except Exception as e:
print(f"An error occurred: {e}")
return info
if __name__ == "__main__":
system_info = get_system_info()
print("--- System Information ---")
for key, value in system_info.items():
print(f"{key}: {value}")
print("-------------------------")
â» Windowsã§ã® wmic
ã³ãã³ãã¯ãå®è¡ç°å¢ãOSããŒãžã§ã³ã«ãã£ãŠãšã³ã³ãŒãã£ã³ã° (äŸ: cp932
) ã®æå®ãå¿
èŠãªå ŽåããããŸãã
2. Gitã³ãã³ãã®å®è¡
ããŒãžã§ã³ç®¡çã·ã¹ãã Gitã®æäœãPythonããè¡ããŸãã
import subprocess
import os
def get_git_branch(repo_path):
if not os.path.isdir(os.path.join(repo_path, '.git')):
return None, "Not a git repository"
try:
# git branch --show-current (Git 2.22以é)
# ãŸã㯠git rev-parse --abbrev-ref HEAD
result = subprocess.run(
['git', 'branch', '--show-current'],
cwd=repo_path, # ãªããžããªã®ãã¹ã§å®è¡
capture_output=True,
text=True,
check=True
)
return result.stdout.strip(), None
except FileNotFoundError:
return None, "Git command not found"
except subprocess.CalledProcessError as e:
# ãã©ã³ãããªãããªã©ã®ãšã©ãŒ
return None, f"Git command failed: {e.stderr.strip()}"
except Exception as e:
return None, f"An unexpected error: {e}"
def git_pull(repo_path):
if not os.path.isdir(os.path.join(repo_path, '.git')):
return False, "Not a git repository"
try:
print(f"Running 'git pull' in {repo_path}...")
result = subprocess.run(
['git', 'pull'],
cwd=repo_path,
capture_output=True, # åºåãèŠãããã«ãã£ããã£
text=True,
check=True # ãšã©ãŒãããã°äŸå€çºç
)
print("Git pull successful.")
print("Output:\n", result.stdout)
return True, result.stdout
except FileNotFoundError:
return False, "Git command not found"
except subprocess.CalledProcessError as e:
print(f"Git pull failed:")
print(f"Stderr: {e.stderr}")
return False, e.stderr
except Exception as e:
print(f"An unexpected error: {e}")
return False, str(e)
if __name__ == "__main__":
repo_directory = "." # ã«ã¬ã³ããã£ã¬ã¯ããªã察象ãšããå Žå
branch, error = get_git_branch(repo_directory)
if error:
print(f"Error getting branch: {error}")
else:
print(f"Current git branch: {branch}")
# ãã«ãå®è¡ (泚æ: å®è¡ãããšãªããžããªãæŽæ°ãããŸã)
# success, message = git_pull(repo_directory)
# if not success:
# print("Failed to pull repository.")
3. å€éšããã°ã©ã ã®å®è¡ãšçµæå©çš
å¥ã®èšèªã§æžãããèšç®ããã°ã©ã ãå®è¡ãããã®çµæãPythonã§å©çšããŸãã
äŸãšããŠãæšæºå
¥åããæ°å€ãåãåãããã®åèšãæšæºåºåã«è¿ãç°¡åãªå€éšããã°ã©ã (calculator.py
ãšãã) ãèããŸãã
# calculator.py (å€éšããã°ã©ã ã®äŸ)
import sys
total = 0
for line in sys.stdin:
try:
total += float(line.strip())
except ValueError:
# æ°å€ã«å€æã§ããªãè¡ã¯ç¡èŠ (ãŸãã¯ãšã©ãŒåºå)
pass
print(total)
ãã® calculator.py
ã subprocess
ã§åŒã³åºãPythonã¹ã¯ãªããã¯ä»¥äžã®ããã«ãªããŸãã
# main_script.py
import subprocess
import sys
numbers_to_sum = "10\n20.5\n30\n-5.5\n" # \n åºåãã§æ°å€ãå
¥å
try:
result = subprocess.run(
[sys.executable, 'calculator.py'], # Pythonã€ã³ã¿ããªã¿çµç±ã§å®è¡
input=numbers_to_sum,
capture_output=True,
text=True,
check=True # calculator.py ããšã©ãŒçµäºãããäŸå€
)
# çµæ (æšæºåºå) ãæ°å€ã«å€æ
total_sum = float(result.stdout.strip())
print(f"The sum calculated by external script is: {total_sum}")
except FileNotFoundError:
print("Error: Python interpreter or calculator.py not found.")
except subprocess.CalledProcessError as e:
print(f"Calculator script failed:")
print(f" Return code: {e.returncode}")
print(f" Stderr: {e.stderr}")
except ValueError:
print(f"Error: Could not convert the script output '{result.stdout.strip()}' to float.")
except Exception as e:
print(f"An unexpected error occurred: {e}")
ãããã®äŸã¯ãsubprocess
ãããã«å€æ§ãªã¿ã¹ã¯ã«å¿çšã§ãããã瀺ããŠããŸãããã¡ã€ã«æäœãããŒã¿åŠçãä»ã®ããŒã«ãšã®é£æºãªã©ãå¯èœæ§ã¯ç¡é倧ã§ã âšã
ãŸãšã ð
Pythonã® subprocess
ã¢ãžã¥ãŒã«ã¯ãå€éšã³ãã³ããããã»ã¹ãå®è¡ã»ç®¡çããããã®åŒ·åã§æè»ãªããŒã«ã§ãããã®èšäºã§ã¯ã以äžã®äž»èŠãªç¹ã«ã€ããŠè§£èª¬ããŸããã
- åºæ¬çãªã³ãã³ãå®è¡ã«ã¯
subprocess.run()
ã䜿ãã®ãæšå¥šãããã - ã³ãã³ããšåŒæ°ã¯ãªã¹ã圢åŒã§æž¡ãã®ãå®å
š (
shell=False
)ã capture_output=True
ãstdout=subprocess.PIPE
,stderr=subprocess.PIPE
ã§åºåããã£ããã£ã§ãããtext=True
ã§å ¥åºåãæååãšããŠæ±ãããinput
åŒæ°ã§æšæºå ¥åã«ããŒã¿ãæž¡ãããcheck=True
ã§ã³ãã³ã倱ææã«CalledProcessError
äŸå€ãçºçããããããtimeout
åŒæ°ã§å®è¡æéå¶éãèšå®ã§ããã- ãã€ãã©ã€ã³åŠçãéåæåŠçã«ã¯
subprocess.Popen
ã¯ã©ã¹ãæå¹ã shell=True
ã¯ã·ã§ã«ã€ã³ãžã§ã¯ã·ã§ã³ã®ãªã¹ã¯ããããããååãšããŠé¿ããã¹ãã
subprocess
ã¢ãžã¥ãŒã«ã䜿ãããªãããšã§ãPythonã¹ã¯ãªããã®å¯èœæ§ã¯å€§ããåºãããŸããã·ã¹ãã ã®èªååãä»ã®ããŒã«ãšã®é£æºãè€éãªã¯ãŒã¯ãããŒã®æ§ç¯ãªã©ãæ§ã
ãªå Žé¢ã§åœ¹ç«ã€ã¯ãã§ãã
ãã ããç¹ã«ã»ãã¥ãªãã£ã«é¢ããŠã¯åžžã«æ³šæãæããå®å šãªæ¹æ³ã§å©çšããããšãå¿ãããŠãã ãããå€éšããã»ã¹ãšã®ããåãã¯ãäºæãã¬åäœãè匱æ§ã®åå ãšãªããããããæ éãªèšèšãšãšã©ãŒãã³ããªã³ã°ãäžå¯æ¬ ã§ãã
ãã®èšäºããçããã® subprocess
ã¢ãžã¥ãŒã«ã®ç解ãæ·±ãã掻çšã®äžå©ãšãªãã°å¹žãã§ããHappy coding! ð
ã³ã¡ã³ã