Skip to content

Quantum

A set of tools for quantum calculations.

A Qubit in a regular computer is quantum of algorithm that is executed in one iteration of a cycle in a separate processor thread.

Quantum is a function with an algorithm of task for data processing.

In this case, the Qubit is not a single information, but it is a concept of the principle of operation of quantum calculations on a regular computer.

The module contains the following tools:

  • LoopMode - Quantum loop mode.
  • count_qubits() - Counting the number of conceptual qubits of your computer.
  • QuantumLoop - Separation of the cycle into quantum algorithms for multiprocessing data processing.

LoopMode

Bases: Enum

Quantum loop mode.

Source code in src\xloft\quantum.py
27
28
29
30
31
class LoopMode(Enum):
    """Quantum loop mode."""

    PROCESS_POOL = 1
    THREAD_POOL = 2

QuantumLoop

Separation of the cycle into quantum algorithms for multiprocessing data processing.

Examples:

>>> from xloft.quantum import QuantumLoop
>>> def task(item):
... return item * item
>>> data = range(10)
>>> QuantumLoop(task, data).run()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

Parameters:

Name Type Description Default
task Callable

Function with a task algorithm.

required
data Iterable[Any]

The data that needs to be processed.

required
max_workers int | None

The maximum number of processes that can be used to execute the given calls. If None or not given then as many worker processes will be created as the machine has processors.

None
timeout float | None

The maximum number of seconds to wait. If None, then there is no limit on the wait time.

None
chunksize int

The size of the chunks the iterable will be broken into before being passed to a child process. This argument is only used by ProcessPoolExecutor; it is ignored by ThreadPoolExecutor.

1
mode LoopMode

The operating mode for a quantum loop: LoopMode.PROCESS_POOL | LoopMode.THREAD_POOL.

PROCESS_POOL
Source code in src\xloft\quantum.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
class QuantumLoop:
    """Separation of the cycle into quantum algorithms for multiprocessing data processing.

    Examples:
        >>> from xloft.quantum import QuantumLoop
        >>> def task(item):
        ... return item * item
        >>> data = range(10)
        >>> QuantumLoop(task, data).run()
        [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

    Args:
        task: Function with a task algorithm.
        data: The data that needs to be processed.
        max_workers: The maximum number of processes that can be used to
                     execute the given calls. If None or not given then as many
                     worker processes will be created as the machine has processors.
        timeout: The maximum number of seconds to wait. If None, then there
                 is no limit on the wait time.
        chunksize: The size of the chunks the iterable will be broken into
                   before being passed to a child process. This argument is only
                   used by ProcessPoolExecutor; it is ignored by ThreadPoolExecutor.
        mode: The operating mode for a quantum loop: LoopMode.PROCESS_POOL | LoopMode.THREAD_POOL.
    """

    def __init__(  # noqa: D107
        self,
        task: Callable,
        data: Iterable[Any],
        max_workers: int | None = None,
        timeout: float | None = None,
        chunksize: int = 1,
        mode: LoopMode = LoopMode.PROCESS_POOL,
    ) -> None:
        self.quantum = task
        self.data = data
        self.max_workers = max_workers
        self.timeout = timeout
        self.chunksize = chunksize
        self.mode = mode

    def process_pool(self) -> list[Any]:
        """Better suitable for operations for which large processor resources are required."""
        with concurrent.futures.ProcessPoolExecutor(self.max_workers) as executor:
            results = list(
                executor.map(
                    self.quantum,
                    self.data,
                    timeout=self.timeout,
                    chunksize=self.chunksize,
                ),
            )
        return results  # noqa: RET504

    def thread_pool(self) -> list[Any]:
        """More suitable for tasks related to input-output
        (for example, network queries, file operations),
        where GIL is freed during input-output operations."""  # noqa: D205, D209
        with concurrent.futures.ThreadPoolExecutor(self.max_workers) as executor:
            results = list(
                executor.map(
                    self.quantum,
                    self.data,
                    timeout=self.timeout,
                    chunksize=self.chunksize,
                ),
            )
        return results  # noqa: RET504

    def run(self) -> list[Any]:
        """Run the quantum loop."""
        results: list[Any] = []
        match self.mode.value:
            case 1:
                results = self.process_pool()
            case 2:
                results = self.thread_pool()
            case _ as unreachable:
                assert_never(Never(unreachable))
        return results

process_pool()

Better suitable for operations for which large processor resources are required.

Source code in src\xloft\quantum.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def process_pool(self) -> list[Any]:
    """Better suitable for operations for which large processor resources are required."""
    with concurrent.futures.ProcessPoolExecutor(self.max_workers) as executor:
        results = list(
            executor.map(
                self.quantum,
                self.data,
                timeout=self.timeout,
                chunksize=self.chunksize,
            ),
        )
    return results  # noqa: RET504

run()

Run the quantum loop.

Source code in src\xloft\quantum.py
122
123
124
125
126
127
128
129
130
131
132
def run(self) -> list[Any]:
    """Run the quantum loop."""
    results: list[Any] = []
    match self.mode.value:
        case 1:
            results = self.process_pool()
        case 2:
            results = self.thread_pool()
        case _ as unreachable:
            assert_never(Never(unreachable))
    return results

thread_pool()

More suitable for tasks related to input-output (for example, network queries, file operations), where GIL is freed during input-output operations.

Source code in src\xloft\quantum.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def thread_pool(self) -> list[Any]:
    """More suitable for tasks related to input-output
    (for example, network queries, file operations),
    where GIL is freed during input-output operations."""  # noqa: D205, D209
    with concurrent.futures.ThreadPoolExecutor(self.max_workers) as executor:
        results = list(
            executor.map(
                self.quantum,
                self.data,
                timeout=self.timeout,
                chunksize=self.chunksize,
            ),
        )
    return results  # noqa: RET504

count_qubits()

Counting the number of conceptual qubits of your computer.

Conceptual qubit is quantum of algorithm (task) that is executed in iterations of a cycle in a separate processor thread.

Quantum of algorithm is a function for data processing.

Examples:

>>> from xloft.quantum import count_qubits
>>> count_qubits()
16

Returns:

Type Description
int

The number of conceptual qubits.

Source code in src\xloft\quantum.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def count_qubits() -> int:
    """Counting the number of conceptual qubits of your computer.

    Conceptual qubit is quantum of algorithm (task) that is executed in
    iterations of a cycle in a separate processor thread.

    Quantum of algorithm is a function for data processing.

    Examples:
        >>> from xloft.quantum import count_qubits
        >>> count_qubits()
        16

    Returns:
        The number of conceptual qubits.
    """
    return multiprocessing.cpu_count()