Each table row shows performance measurements for this PyPy 3 program with a particular command-line input value N.
| N | CPU secs | Elapsed secs | Memory KB | Code B | ≈ CPU Load |
|---|---|---|---|---|---|
| 10,000 | 0.46 | 0.36 | 1,856 | 2011 | 3% 8% 91% 8% 8% 6% 8% 11% |
Read the ↓ make, command line, and program output logs to see how this program was run.
Read k-nucleotide benchmark to see what this program should do.
# The Computer Language Benchmarks Game
# http://benchmarksgame.alioth.debian.org/
#
# submitted by Joerg Baumann
from os import cpu_count
from sys import stdin
from collections import defaultdict
from itertools import starmap, chain
from multiprocessing import Pool
lean_buffer = {}
def lean_args(sequence, reading_frames, i, j):
global lean_buffer
lean_key = len(lean_buffer)
lean_buffer[lean_key] = sequence
return lean_key, reading_frames, i, j
class lean_call:
def __init__(self, func):
self.func = func
def __call__(self, lean_key, reading_frames, i, j):
global lean_buffer
sequence = lean_buffer[lean_key]
results = self.func(sequence, reading_frames, i, j)
lean_results = []
for frame, n, frequences in results:
lean_frequences = defaultdict(int)
for reading_frame, bits_list in reading_frames:
if reading_frame == frame:
for bits in bits_list:
lean_frequences[bits] = frequences[bits]
lean_results.append((frame, n, lean_frequences))
return lean_results
def count_frequencies(sequence, reading_frames, i, j):
frames = tuple(
sorted([frame for frame,_ in reading_frames], reverse=True))
frequences_mask_list = tuple(
((defaultdict(int), (1 << (2 * frame)) - 1) for frame in frames))
frame = frames[0]
frequences, mask = frequences_mask_list[0]
short_frame_frequences = frequences_mask_list[1:]
mono_nucleotides = []
frame_tail = len(frames) - 1
if frame_tail >= 0 and frames[frame_tail] == 1:
freq = frequences_mask_list[frame_tail][0]
worklist = sequence[i:j]
len_before = len(worklist)
while len_before > 0:
n = worklist[0:1]
worklist = worklist.translate(None, n)
len_after = len(worklist)
freq[n[0]] = len_before - len_after
len_before = len_after
mono_nucleotides.append(n)
frame_tail -= 1
if frame_tail >= 0 and frames[frame_tail] == 2 and mono_nucleotides:
freq = frequences_mask_list[frame_tail][0]
worklist = sequence[i:min(j+1, len(sequence))]
overlaps = []
for v in (n + m for n in mono_nucleotides for m in mono_nucleotides):
bits = v[0]*4+v[1]
freq[bits] = worklist.count(v)
if v[1:] == v[:1]:
overlaps.append((v, bits, v[:1]+v))
for v, bits, pattern in overlaps:
count = len(worklist)
tmp = worklist.replace(pattern+pattern, b'12')
tmp = tmp.replace(pattern, b'1')
count = (count - len(tmp)) // 2
count += tmp.count(b'1'+v)
count += tmp.count(b'2'+v[:1])
freq[bits] += count
frame_tail -= 1
short_frame_frequences = short_frame_frequences[:frame_tail]
if len(short_frame_frequences):
bits = 0
if i == 0:
for k in range(i, i + frame - 1):
bits = bits * 4 + sequence[k]
for t, (f, m) in enumerate(short_frame_frequences, 1):
if k - i + 1 >= frames[t]:
f[bits & m] += 1
else:
for k in range(i - frame + 1, i):
bits = bits * 4 + sequence[k]
for byte in sequence[k+1:j]:
bits = (bits * 4 + byte) & mask
frequences[bits] += 1
for f, m in short_frame_frequences:
f[bits & m] += 1
return [
(frame, len(sequence) - frame + 1, frequences_mask_list[i][0])
for i, frame in enumerate(frames)]
def read_sequence(file, header, translation) :
for line in file:
if line[0] == ord('>'):
if line[1:len(header)+1] == header:
break
sequence = bytearray()
for line in file:
if line[0] == ord('>'):
break
sequence += line
return sequence.translate(translation, b'\n\r\t ')
def lookup_frequency(results, frame, bits):
n = 1
frequency = 0
for _, n, frequencies in filter(lambda r: r[0] == frame, results):
frequency += frequencies[bits]
return frequency, n if n > 0 else 1
def display(results, display_list, sort=False, relative=False, end='\n'):
lines = [
(k_nucleotide, lookup_frequency(results, frame, bits))
for k_nucleotide, frame, bits in display_list
]
if sort: lines = sorted(lines, key=lambda v: (-v[1][0], v[0]))
for k_nucleotide, (frequency, n) in lines:
if relative:
print("{0} {1:.3f}".format(k_nucleotide, frequency * 100. / n))
else:
print("{1}\t{0}".format(k_nucleotide, frequency))
print(end=end)
def main():
translation = bytes.maketrans(b'GTCAgtca',
b'\x00\x01\x02\x03\x00\x01\x02\x03')
def str_to_bits(text):
buffer = text.encode('latin1').translate(translation)
bits = 0
for k in range(len(buffer)):
bits = bits * 4 + buffer[k]
return bits
def display_list(k_nucleotides):
return [(n, len(n), str_to_bits(n)) for n in k_nucleotides]
sequence = read_sequence(stdin.buffer, b'THREE', translation)
mono_nucleotides = ('G', 'A', 'T', 'C')
di_nucleotides = tuple(n + m
for n in mono_nucleotides for m in mono_nucleotides)
k_nucleotides = (
'GGT', 'GGTA', 'GGTATT', 'GGTATTTTAATT', 'GGTATTTTAATTTATAGT')
reading_frames = [
(1, tuple(map(str_to_bits, mono_nucleotides))),
(2, tuple(map(str_to_bits, di_nucleotides))),
] + list(map(lambda s: (len(s), (str_to_bits(s),)), k_nucleotides))
if len(sequence) > 128 * cpu_count(): n = cpu_count()
else: n = 1
partitions = [len(sequence) * i // n for i in range(n+1)]
count_jobs = [
(sequence, reading_frames, partitions[i], partitions[i + 1])
for i in range(len(partitions) - 1)]
if n == 1:
results = list(chain(*starmap(count_frequencies, count_jobs)))
else:
lean_jobs = list(starmap(lean_args, count_jobs))
with Pool() as pool:
async_results = pool.starmap_async(
lean_call(count_frequencies), lean_jobs)
results = list(chain(*async_results.get()))
display(results, display_list(mono_nucleotides), relative=True, sort=True)
display(results, display_list(di_nucleotides), relative=True, sort=True)
display(results, display_list(k_nucleotides), end='')
if __name__=='__main__' :
main()
Fri, 09 Sep 2022 06:02:54 GMT COMMAND LINE: /usr/bin/pypy3 knucleotide.pypy3-3.pypy3 0 < knucleotide-input10000.txt PROGRAM OUTPUT: A 30.284 T 29.796 C 20.312 G 19.608 AA 9.212 AT 8.950 TT 8.948 TA 8.936 CA 6.166 CT 6.100 AC 6.086 TC 6.042 AG 6.036 GA 5.968 TG 5.868 GT 5.798 CC 4.140 GC 4.044 CG 3.906 GG 3.798 562 GGT 152 GGTA 15 GGTATT 0 GGTATTTTAATT 0 GGTATTTTAATTTATAGT