Skip to content
Permalink
Browse files
parallel processing
  • Loading branch information
dungranij committed Jul 5, 2021
1 parent 013177e commit 19b94d5c6434b1a736cddc3a2772bc06c3331ae3
Showing 1 changed file with 125 additions and 0 deletions.
@@ -0,0 +1,125 @@
function ParallelProcessing
%% 1: Load Data
clear all
close all

FileName = '/Users/juhildungrani/Desktop/Sem 4/Big Data/work/5011CEM2021-dungranij/2a Data Exploration/Model 2/o3_surface_20180701000000.nc';

Contents = ncinfo(FileName);

Lat = ncread(FileName, 'lat');
Lon = ncread(FileName, 'lon');
NumHours = 25;

%% 2: Processing parameters
% ## provided by customer ##
RadLat = 30.2016;
RadLon = 24.8032;
RadO3 = 4.2653986e-08;

StartLat = 1;
NumLat = 400;
StartLon = 1;
NumLon = 700;

%% 3: Pre-allocate output array memory
% the '-4' value is due to the analysis method resulting in fewer output
% values than the input array.
NumLocations = (NumLon - 4) * (NumLat - 4);
EnsembleVectorPar = zeros(NumLocations, NumHours); % pre-allocate memory

%% 4: Cycle through the hours and load all the models for each hour and record memory use
% We use an index named 'NumHour' in our loop
% The section 'parallel processing' will process the data location one
% after the other, reporting on the time involved.
tic
for idxTime = 1:5 %NumHours

%% 5: Load the data for each hour
% Each hour we read the data from the required models, defined by the
% index variable. Each model data are placed on a 'layer' of the 3D
% array resulting in a 7 x 700 x 400 array.
% We do this by indexing through the model names, then defining the
% start position as the beginnning of the Lat, beginning of the Lon and
% beginning of the new hour. We then define the number of elements
% along each data dimension, so the total number of Lat, the total
% number of Lon, but only 1 hour.
% You can use these values to select a smaller sub-set of the data if
% required to speed up testing o fthe functionality.

DataLayer = 1;
for idx = [1, 2, 4, 5, 6, 7, 8]
HourlyData(DataLayer,:,:) = ncread(FileName, Contents.Variables(idx).Name,...
[StartLon, StartLat, idxTime], [NumLon, NumLat, 1]);
DataLayer = DataLayer + 1;
end

%% 6: Pre-process the data for parallel processing
% This takes the 3D array of data [model, lat, lon] and generates the
% data required to be processed at each location.
% ## This process is defined by the customer ##
% If you want to know the details, please ask, but this is not required
% for the module or assessment.
[Data2Process, LatLon] = PrepareData(HourlyData, Lat, Lon);


%% Parallel Analysis
%% 7: Create the parallel pool and attache files for use
PoolSize = 4 ; % define the number of processors to use in parallel
if isempty(gcp('nocreate'))
parpool('local',PoolSize);
end
poolobj = gcp;
% attaching a file allows it to be available at each processor without
% passing the file each time. This speeds up the process. For more
% information, ask your tutor.
addAttachedFiles(poolobj,{'EnsembleValue'});

% %% 8: Parallel processing is difficult to monitor progress so we define a
% % special function to create a wait bar which is updated after each
% % process completes an analysis. The update function is defined at the
% % end of this script. Each time a parallel process competes it runs the
% % function to update the waitbar.
DataQ = parallel.pool.DataQueue; % Create a variable in the parallel pool
%
% % Create a waitbar and handle top it:
hWaitBar = waitbar(0, sprintf('Time period %i, Please wait ...', idxTime));
% % Define the function to call when new data is received in the data queue
% % 'DataQ'. See end of script for the function definition.
afterEach(DataQ, @nUpdateWaitbar);
N = size(Data2Process,1); % the total number of data to process
p = 20; % offset so the waitbar shows some colour quickly.

%% 9: The actual parallel processing!
% Ensemble value is a function defined by the customer to calculate the
% ensemble value at each location. Understanding this function is not
% required for the module or the assessment, but it is the reason for
% this being a 'big data' project due to the processing time (not the
% pure volume of raw data alone).
T4 = toc;
parfor idx = 1: 100 % size(Data2Process,1)
[EnsembleVectorPar(idx, idxTime)] = EnsembleValue(Data2Process(idx,:,:,:), LatLon, RadLat, RadLon, RadO3);
send(DataQ, idx);
end

close(hWaitBar); % close the wait bar

T3(idxTime) = toc - T4; % record the parallel processing time for this hour of data
fprintf('Parallel processing time for hour %i : %.1f s\n', idxTime, T3(idxTime))

end % end time loop
T2 = toc;
delete(gcp);

%% 10: Reshape ensemble values to Lat, lon, hour format
EnsembleVectorPar = reshape(EnsembleVectorPar, 696, 396, []);
fprintf('Total processing time for %i workers = %.2f s\n', PoolSize, sum(T3));

%% 11: ### PROCESSING COMPLETE DATA NEEDS TO BE SAVED ###

function nUpdateWaitbar(~) % nested function
waitbar(p/N, hWaitBar, sprintf('Hour %i, %.3f complete, %i out of %i', idxTime, p/N*100, p, N));
p = p + 1;
end

end % end function

0 comments on commit 19b94d5

Please sign in to comment.