1 year ago

#336316

test-img

Romain

OSError while computing large amount of data with dask

I have a large amount of data (*.grib) that I load using xarray and dask.

To make it simple, my data is record of world temperature during the month of january 2022. Data are collected multiple time per day (time dimension) every 0.5 latitude and longitude of the globe. I'm trying to determine what was for the month of january the minimum temperature that have been recorded in each point. My result is a 2D matrix (720*361)

My dataset is the following :

<xarray.Dataset>
Dimensions:     (time: 1426, latitude: 361, longitude: 720)
Coordinates:
    number      int32 0
  * time        (time) datetime64[ns] 1999-01-01 ... 2021-01-31T12:00:00
    step        timedelta64[ns] 00:00:00
    surface     float64 0.0
  * latitude    (latitude) float64 90.0 89.5 89.0 88.5 ... -89.0 -89.5 -90.0
  * longitude   (longitude) float64 0.0 0.5 1.0 1.5 ... 358.0 358.5 359.0 359.5
    valid_time  (time) datetime64[ns] dask.array<chunksize=(100,), meta=np.ndarray>
Data variables:
    t2m         (time, latitude, longitude) float32 dask.array<chunksize=(100, 361, 720), meta=np.ndarray>
...

Here a example of what i'm doing and the following error:

import numpy as np
import xarray as xr

ds=xr.open_dataset("mylargefile.grib",engine='cfgrib', chunks={"time": 100})

#calcul the minimum
min = ds.t2m.min(dim=("time"))

#display
print(min.compute())

Traceback (most recent call last):
  File "reader.py", line 183, in <module>
    calculate_stats()
  File "reader.py", line 72, in calculate_stats
    min = min.compute()
  File "C:\Users\chikirou\Anaconda3\envs\py37\lib\site-packages\dask\base.py", line 288, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "C:\Users\chikirou\Anaconda3\envs\py37\lib\site-packages\dask\base.py", line 570, in compute
    results = schedule(dsk, keys, **kwargs)
  File "C:\Users\chikirou\Anaconda3\envs\py37\lib\site-packages\dask\threaded.py", line 87, in get
    **kwargs,
  File "C:\Users\chikirou\Anaconda3\envs\py37\lib\site-packages\dask\local.py", line 507, in get_async
    raise_exception(exc, tb)
  File "C:\Users\chikirou\Anaconda3\envs\py37\lib\site-packages\dask\local.py", line 315, in reraise
    raise exc
  File "C:\Users\chikirou\Anaconda3\envs\py37\lib\site-packages\dask\local.py", line 220, in execute_task
    result = _execute_task(task, data)
  File "C:\Users\chikirou\Anaconda3\envs\py37\lib\site-packages\dask\core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "C:\Users\chikirou\Anaconda3\envs\py37\lib\site-packages\dask\core.py", line 119, in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
  File "C:\Users\chikirou\Anaconda3\envs\py37\lib\site-packages\dask\core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "C:\Users\chikirou\Anaconda3\envs\py37\lib\site-packages\dask\core.py", line 119, in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
  File "C:\Users\chikirou\Anaconda3\envs\py37\lib\site-packages\dask\core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "C:\Users\chikirou\Anaconda3\envs\py37\lib\site-packages\dask\array\core.py", line 108, in getter
    c = np.asarray(c)
  File "C:\Users\chikirou\Anaconda3\envs\py37\lib\site-packages\xarray\core\indexing.py", line 357, in __array__
    return np.asarray(self.array, dtype=dtype)
  File "C:\Users\chikirou\Anaconda3\envs\py37\lib\site-packages\xarray\core\indexing.py", line 521, in __array__
    return np.asarray(self.array, dtype=dtype)
  File "C:\Users\chikirou\Anaconda3\envs\py37\lib\site-packages\xarray\core\indexing.py", line 422, in __array__
    return np.asarray(array[self.key], dtype=None)
  File "C:\Users\chikirou\Anaconda3\envs\py37\lib\site-packages\cfgrib\xarray_plugin.py", line 145, in __getitem__
    key, self.shape, xr.core.indexing.IndexingSupport.BASIC, self._getitem
  File "C:\Users\chikirou\Anaconda3\envs\py37\lib\site-packages\xarray\core\indexing.py", line 711, in explicit_indexing_adapter
    result = raw_indexing_method(raw_key.tuple)
  File "C:\Users\chikirou\Anaconda3\envs\py37\lib\site-packages\cfgrib\xarray_plugin.py", line 150, in _getitem
    return self.array[key]
  File "C:\Users\chikirou\Anaconda3\envs\py37\lib\site-packages\cfgrib\dataset.py", line 342, in __getitem__
    message = self.index.get_field(message_ids[0])  # type: ignore
  File "C:\Users\chikirou\Anaconda3\envs\py37\lib\site-packages\cfgrib\messages.py", line 472, in get_field
    return ComputedKeysAdapter(self.fieldset[message_id], self.computed_keys)
  File "C:\Users\chikirou\Anaconda3\envs\py37\lib\site-packages\cfgrib\messages.py", line 332, in __getitem__
    return self.message_from_file(file, offset=item)
  File "C:\Users\chikirou\Anaconda3\envs\py37\lib\site-packages\cfgrib\messages.py", line 328, in message_from_file
    return Message.from_file(file, offset, **kwargs)
  File "C:\Users\chikirou\Anaconda3\envs\py37\lib\site-packages\cfgrib\messages.py", line 91, in from_file
    file.seek(offset)
OSError: [Errno 22] Invalid argument

Is it a memory problem ? Maybe compute() is not the method I need but what should I use instead ?

EDIT : Efficiency

For efficiency purpose, thanks to @mrdurant, i'm using the method ds.t2m.min(dim=("time")) to calculate the minimum

By the past i was doing the following :

for index, temperature in enumerate(ds.variables['t2m'].data[1:]):
        min = np.minimum(min, temperature)

python

numpy

dask

python-xarray

grib

0 Answers

Your Answer

Accepted video resources