import popen2 # other popen methods than popen4 can lead to deadlocks # if there is much data on stdout and stderr (err_out, stdin) = popen2.popen4("program args") lines = err_out.read() # collect output into one multiline string (err_out, stdin) = popen2.popen4("program args") lines = err_out.readlines() # collect output into a list, one line per element #----------------------------- (err_out, stdin) = popen2.popen4("program args") output = [] while True: line = err_out.readline() if not line: break output.appen(line) output = ''.join(output) |
import os myfile = "foo.txt" status = os.system("vi %s" % myfile) #----------------------------- import os os.system("cmd1 args | cmd2 | cmd3 >outfile") os.system("cmd args <infile >outfile 2>errfile") status = os.system("%s %s %s" % (program, arg1, arg2)) if status != 0: print "%s exited funny: %s" % (program, status) raise SystemExit |
# ----------------------------- import os import sys import glob args = glob.glob("*.data") try: os.execvp("archive", args) except OSError, e: print "Couldn't replace myself with archive: %s" % err raise SystemExit # The error message does not contain the line number like the "die" in # perl. But if you want to show more information for debugging, you can # delete the try...except and you get a nice traceback which shows all # line numbers and filenames. # ----------------------------- os.execvp("archive", ["accounting.data"]) |
# ------------------------- # Read from a child process import sys import popen2 pipe = popen2.Popen4("program arguments") pid = pipe.pid for line in pipe.fromchild.readlines(): sys.stdout.write(line) # Popen4 provides stdout and stderr. # This avoids deadlocks if you get data # from both streams. # # If you don't need the pid, you # can use popen2.popen4(...) # ----------------------------- # Write to a child process import popen2 pipe = popen2.Popen4("gzip > foo.gz") pid = pipe.pid pipe.tochild.write("Hello zipped world!\n") pipe.tochild.close() # programm will get EOF on STDIN |
class OutputFilter(object): def __init__(self, target, *args, **kwds): self.target = target self.setup(*args, **kwds) self.textbuffer = "" def setup(self, *args, **kwds): pass def write(self, data): if data.endswith("\n"): data = self.process(self.textbuffer + data) self.textbuffer = "" if data is not None: self.target.write(data) else: self.textbuffer += data def process(self, data): return data class HeadFilter(OutputFilter): def setup(self, maxcount): self.count = 0 self.maxcount = maxcount def process(self, data): if self.count < self.maxcount: self.count += 1 return data class NumberFilter(OutputFilter): def setup(self): self.count=0 def process(self, data): self.count += 1 return "%s: %s"%(self.count, data) class QuoteFilter(OutputFilter): def process(self, data): return "> " + data import sys f = HeadFilter(sys.stdout, 100) for i in range(130): print>>f, i print txt = """Welcome to Linux, version 2.0.33 on a i686 "The software required `Windows 95 or better', so I installed Linux." """ f1 = NumberFilter(sys.stdout) f2 = QuoteFilter(f1) for line in txt.split("\n"): print>>f2, line print f1 = QuoteFilter(sys.stdout) f2 = NumberFilter(f1) for line in txt.split("\n"): print>>f2, line |
# This script accepts several filenames
# as argument. If the file is zipped, unzip
# it first. Then read each line if the file
import os
import sys
import popen2
for file in sys.argv[1:]:
if file.endswith(".gz") or file.endswith(".Z"):
(stdout, stdin) = popen2.popen2("gzip -dc '%s'" % file)
fd = stdout
else:
fd = open(file)
for line in fd:
# ....
sys.stdout.write(line)
fd.close()
#-----------------------------
#-----------------------------
# Ask for filename and open it
import sys
print "File, please?"
line = sys.stdin.readline()
file = line.strip() # chomp
open(file) |
# Execute foo_command and read the output import popen2 (stdout_err, stdin) = popen2.popen4("foo_command") for line in stdout_err.readlines(): # .... |
# Open command in a pipe # which reads from stdin and writes to stdout import popen2 pipe = popen2.Popen4("wc -l") # Unix command pipe.tochild.write("line 1\nline 2\nline 3\n") pipe.tochild.close() output = pipe.fromchild.read() |
# popen3: get stdout and stderr of new process # Attetion: This can lead to deadlock, # since the buffer of stderr or stdout might get filled. # You need to use select if you want to avoid this. import popen2 (child_stdout, child_stdin, child_stderr) = popen2.popen3(...) |
# @@INCOMPLETE@@ # @@INCOMPLETE@@ |
# @@INCOMPLETE@@ # @@INCOMPLETE@@ |
# @@INCOMPLETE@@ # @@INCOMPLETE@@ |
# # Print available signals and their value # See "man signal" "man kill" on unix. import signal for name in dir(signal): if name.startswith("SIG"): value = getattr(signal, name) print "%s=%s" % (name, value) |
# You can send signals to processes # with os.kill(pid, signal) |
import signal def get_sig_quit(signum, frame): .... signal.signal(signal.SIGQUIT, get_sig_quit) # Install handler signal.signal(signal.SIGINT, signal.SIG_IGN) # Ignore this signal signal.signal(signal.SIGSTOP, signal.SIG_DFL) # Restore to default handling |
# Example of handler: User must Enter Name ctrl-c does not help import sys import signal def ding(signum, frame): print "\aEnter your name!" return signal.signal(signal.SIGINT, ding) print "Please enter your name:" name = "" while not name: try: name = sys.stdin.readline().strip() except: pass print "Hello: %s" % name |
# @@INCOMPLETE@@ # @@INCOMPLETE@@ |
import signal # ignore signal INT signal.signal(signal.SIGINT, signal.SIG_IGN) # Install signal handler def tsktsk(signum, frame): print "..." signal.signal(signal.SIGINT, tsktsk) |
# @@INCOMPLETE@@ # @@INCOMPLETE@@ |
# @@INCOMPLETE@@ # @@INCOMPLETE@@ |
import signal def handler(signum, frame): raise "timeout" signal.signal(signal.SIGALRM, handler) try: signal.alarm(5) # signal.alarm(3600) # long-time operation while True: print "foo" signal.alarm(0) except: signal.alarm(0) print "timed out" else: print "no time out" |
# @@INCOMPLETE@@ # @@INCOMPLETE@@ |